"Fossies" - the Fresh Open Source Software Archive

Member "keystone-18.0.0/keystone/tests/unit/identity/test_backend_sql.py" (14 Oct 2020, 45492 Bytes) of package /linux/misc/openstack/keystone-18.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_backend_sql.py": 17.0.0_vs_18.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #      http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 import datetime
   14 import uuid
   15 
   16 import freezegun
   17 import passlib.hash
   18 
   19 from keystone.common import password_hashing
   20 from keystone.common import provider_api
   21 from keystone.common import resource_options
   22 from keystone.common import sql
   23 import keystone.conf
   24 from keystone import exception
   25 from keystone.identity.backends import base
   26 from keystone.identity.backends import resource_options as iro
   27 from keystone.identity.backends import sql_model as model
   28 from keystone.tests.unit import test_backend_sql
   29 
   30 
   31 CONF = keystone.conf.CONF
   32 PROVIDERS = provider_api.ProviderAPIs
   33 
   34 
   35 class UserPasswordCreatedAtIntTests(test_backend_sql.SqlTests):
   36     def config_overrides(self):
   37         super(UserPasswordCreatedAtIntTests, self).config_overrides()
   38         self.config_fixture.config(group='security_compliance',
   39                                    password_expires_days=1)
   40 
   41     def test_user_password_created_expired_at_int_matches_created_at(self):
   42         with sql.session_for_read() as session:
   43             user_ref = PROVIDERS.identity_api._get_user(
   44                 session, self.user_foo['id']
   45             )
   46             self.assertIsNotNone(user_ref.password_ref._created_at)
   47             self.assertIsNotNone(user_ref.password_ref._expires_at)
   48             self.assertEqual(user_ref.password_ref._created_at,
   49                              user_ref.password_ref.created_at_int)
   50             self.assertEqual(user_ref.password_ref._expires_at,
   51                              user_ref.password_ref.expires_at_int)
   52             self.assertEqual(user_ref.password_ref.created_at,
   53                              user_ref.password_ref.created_at_int)
   54             self.assertEqual(user_ref.password_ref.expires_at,
   55                              user_ref.password_ref.expires_at_int)
   56 
   57 
   58 class UserPasswordHashingTestsNoCompat(test_backend_sql.SqlTests):
   59     def config_overrides(self):
   60         super(UserPasswordHashingTestsNoCompat, self).config_overrides()
   61         self.config_fixture.config(group='identity',
   62                                    password_hash_algorithm='scrypt')
   63 
   64     def test_configured_algorithm_used(self):
   65         with sql.session_for_read() as session:
   66             user_ref = PROVIDERS.identity_api._get_user(
   67                 session, self.user_foo['id']
   68             )
   69         self.assertEqual(
   70             passlib.hash.scrypt,
   71             password_hashing._get_hasher_from_ident(user_ref.password))
   72 
   73 
   74 class UserResourceOptionTests(test_backend_sql.SqlTests):
   75     def setUp(self):
   76         super(UserResourceOptionTests, self).setUp()
   77         # RESET STATE OF REGISTRY OPTIONS
   78         self.addCleanup(iro.register_user_options)
   79         self.addCleanup(iro.USER_OPTIONS_REGISTRY._registered_options.clear)
   80 
   81         self.option1 = resource_options.ResourceOption('opt1', 'option1')
   82         self.option2 = resource_options.ResourceOption('opt2', 'option2')
   83         self.cleanup_instance('option1', 'option2')
   84 
   85         iro.USER_OPTIONS_REGISTRY._registered_options.clear()
   86         iro.USER_OPTIONS_REGISTRY.register_option(self.option1)
   87         iro.USER_OPTIONS_REGISTRY.register_option(self.option2)
   88 
   89     def test_user_set_option_in_resource_option(self):
   90         user = self._create_user(self._get_user_dict())
   91         opt_value = uuid.uuid4().hex
   92         user['options'][self.option1.option_name] = opt_value
   93         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
   94         self.assertEqual(opt_value,
   95                          new_ref['options'][self.option1.option_name])
   96         raw_ref = self._get_user_ref(user['id'])
   97         self.assertIn(self.option1.option_id, raw_ref._resource_option_mapper)
   98         self.assertEqual(
   99             opt_value,
  100             raw_ref._resource_option_mapper[
  101                 self.option1.option_id].option_value)
  102         api_get_ref = PROVIDERS.identity_api.get_user(user['id'])
  103         # Ensure options are properly set in a .get_user call.
  104         self.assertEqual(opt_value,
  105                          api_get_ref['options'][self.option1.option_name])
  106 
  107     def test_user_add_update_delete_option_in_resource_option(self):
  108         user = self._create_user(self._get_user_dict())
  109 
  110         opt_value = uuid.uuid4().hex
  111         new_opt_value = uuid.uuid4().hex
  112 
  113         # Update user to add the new value option
  114         user['options'][self.option1.option_name] = opt_value
  115         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  116         self.assertEqual(opt_value,
  117                          new_ref['options'][self.option1.option_name])
  118 
  119         # Update the option Value and confirm it is updated
  120         user['options'][self.option1.option_name] = new_opt_value
  121         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  122         self.assertEqual(new_opt_value,
  123                          new_ref['options'][self.option1.option_name])
  124 
  125         # Set the option value to None, meaning delete the option
  126         user['options'][self.option1.option_name] = None
  127         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  128         self.assertNotIn(self.option1.option_name, new_ref['options'])
  129 
  130     def test_user_add_delete_resource_option_existing_option_values(self):
  131         user = self._create_user(self._get_user_dict())
  132 
  133         opt_value = uuid.uuid4().hex
  134         opt2_value = uuid.uuid4().hex
  135 
  136         # Update user to add the new value option
  137         user['options'][self.option1.option_name] = opt_value
  138         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  139         self.assertEqual(opt_value,
  140                          new_ref['options'][self.option1.option_name])
  141 
  142         # Update the option value for option 2 and confirm it is updated and
  143         # option1's value remains the same. Option 1 is not specified in the
  144         # updated user ref.
  145         del user['options'][self.option1.option_name]
  146         user['options'][self.option2.option_name] = opt2_value
  147         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  148         self.assertEqual(opt_value,
  149                          new_ref['options'][self.option1.option_name])
  150         self.assertEqual(opt2_value,
  151                          new_ref['options'][self.option2.option_name])
  152         raw_ref = self._get_user_ref(user['id'])
  153         self.assertEqual(
  154             opt_value,
  155             raw_ref._resource_option_mapper[
  156                 self.option1.option_id].option_value)
  157         self.assertEqual(
  158             opt2_value,
  159             raw_ref._resource_option_mapper[
  160                 self.option2.option_id].option_value)
  161 
  162         # Set the option value to None, meaning delete the option, ensure
  163         # option 2 still remains and has the right value
  164         user['options'][self.option1.option_name] = None
  165         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  166         self.assertNotIn(self.option1.option_name, new_ref['options'])
  167         self.assertEqual(opt2_value,
  168                          new_ref['options'][self.option2.option_name])
  169         raw_ref = self._get_user_ref(user['id'])
  170         self.assertNotIn(raw_ref._resource_option_mapper,
  171                          self.option1.option_id)
  172         self.assertEqual(
  173             opt2_value,
  174             raw_ref._resource_option_mapper[
  175                 self.option2.option_id].option_value)
  176 
  177     def test_unregistered_resource_option_deleted(self):
  178         user = self._create_user(self._get_user_dict())
  179 
  180         opt_value = uuid.uuid4().hex
  181         opt2_value = uuid.uuid4().hex
  182 
  183         # Update user to add the new value option
  184         user['options'][self.option1.option_name] = opt_value
  185         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  186         self.assertEqual(opt_value,
  187                          new_ref['options'][self.option1.option_name])
  188 
  189         # Update the option value for option 2 and confirm it is updated and
  190         # option1's value remains the same. Option 1 is not specified in the
  191         # updated user ref.
  192         del user['options'][self.option1.option_name]
  193         user['options'][self.option2.option_name] = opt2_value
  194         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  195         self.assertEqual(opt_value,
  196                          new_ref['options'][self.option1.option_name])
  197         self.assertEqual(opt2_value,
  198                          new_ref['options'][self.option2.option_name])
  199         raw_ref = self._get_user_ref(user['id'])
  200         self.assertEqual(
  201             opt_value,
  202             raw_ref._resource_option_mapper[
  203                 self.option1.option_id].option_value)
  204         self.assertEqual(
  205             opt2_value,
  206             raw_ref._resource_option_mapper[
  207                 self.option2.option_id].option_value)
  208 
  209         # clear registered options and only re-register option1, update user
  210         # and confirm option2 is gone from the ref and returned dict
  211         iro.USER_OPTIONS_REGISTRY._registered_options.clear()
  212         iro.USER_OPTIONS_REGISTRY.register_option(self.option1)
  213         user['name'] = uuid.uuid4().hex
  214         new_ref = PROVIDERS.identity_api.update_user(user['id'], user)
  215         self.assertNotIn(self.option2.option_name, new_ref['options'])
  216         self.assertEqual(opt_value,
  217                          new_ref['options'][self.option1.option_name])
  218         raw_ref = self._get_user_ref(user['id'])
  219         self.assertNotIn(raw_ref._resource_option_mapper,
  220                          self.option2.option_id)
  221         self.assertEqual(
  222             opt_value,
  223             raw_ref._resource_option_mapper[
  224                 self.option1.option_id].option_value)
  225 
  226     def _get_user_ref(self, user_id):
  227         with sql.session_for_read() as session:
  228             return session.query(model.User).get(user_id)
  229 
  230     def _create_user(self, user_dict):
  231         user_dict['id'] = uuid.uuid4().hex
  232         with sql.session_for_write() as session:
  233             user_ref = model.User.from_dict(user_dict)
  234             session.add(user_ref)
  235             return base.filter_user(user_ref.to_dict())
  236 
  237     def _get_user_dict(self):
  238         user = {
  239             'name': uuid.uuid4().hex,
  240             'domain_id': CONF.identity.default_domain_id,
  241             'enabled': True,
  242             'password': uuid.uuid4().hex
  243         }
  244         return user
  245 
  246 
  247 class DisableInactiveUserTests(test_backend_sql.SqlTests):
  248     def setUp(self):
  249         super(DisableInactiveUserTests, self).setUp()
  250         self.password = uuid.uuid4().hex
  251         self.user_dict = self._get_user_dict(self.password)
  252         self.max_inactive_days = 90
  253         self.config_fixture.config(
  254             group='security_compliance',
  255             disable_user_account_days_inactive=self.max_inactive_days)
  256 
  257     def test_authenticate_user_disabled_due_to_inactivity(self):
  258         # create user and set last_active_at beyond the max
  259         last_active_at = (
  260             datetime.datetime.utcnow() -
  261             datetime.timedelta(days=self.max_inactive_days + 1))
  262         user = self._create_user(self.user_dict, last_active_at.date())
  263         with self.make_request():
  264             self.assertRaises(exception.UserDisabled,
  265                               PROVIDERS.identity_api.authenticate,
  266                               user_id=user['id'],
  267                               password=self.password)
  268             # verify that the user is actually disabled
  269             user = PROVIDERS.identity_api.get_user(user['id'])
  270             self.assertFalse(user['enabled'])
  271             # set the user to enabled and authenticate
  272             user['enabled'] = True
  273             PROVIDERS.identity_api.update_user(user['id'], user)
  274             user = PROVIDERS.identity_api.authenticate(
  275                 user_id=user['id'], password=self.password
  276             )
  277             self.assertTrue(user['enabled'])
  278 
  279     def test_authenticate_user_not_disabled_due_to_inactivity(self):
  280         # create user and set last_active_at just below the max
  281         last_active_at = (
  282             datetime.datetime.utcnow() -
  283             datetime.timedelta(days=self.max_inactive_days - 1)).date()
  284         user = self._create_user(self.user_dict, last_active_at)
  285         with self.make_request():
  286             user = PROVIDERS.identity_api.authenticate(
  287                 user_id=user['id'], password=self.password
  288             )
  289         self.assertTrue(user['enabled'])
  290 
  291     def test_get_user_disabled_due_to_inactivity(self):
  292         user = PROVIDERS.identity_api.create_user(self.user_dict)
  293         # set last_active_at just beyond the max
  294         last_active_at = (
  295             datetime.datetime.utcnow() -
  296             datetime.timedelta(self.max_inactive_days + 1)).date()
  297         self._update_user_last_active_at(user['id'], last_active_at)
  298         # get user and verify that the user is actually disabled
  299         user = PROVIDERS.identity_api.get_user(user['id'])
  300         self.assertFalse(user['enabled'])
  301         # set enabled and test
  302         user['enabled'] = True
  303         PROVIDERS.identity_api.update_user(user['id'], user)
  304         user = PROVIDERS.identity_api.get_user(user['id'])
  305         self.assertTrue(user['enabled'])
  306 
  307     def test_get_user_not_disabled_due_to_inactivity(self):
  308         user = PROVIDERS.identity_api.create_user(self.user_dict)
  309         self.assertTrue(user['enabled'])
  310         # set last_active_at just below the max
  311         last_active_at = (
  312             datetime.datetime.utcnow() -
  313             datetime.timedelta(self.max_inactive_days - 1)).date()
  314         self._update_user_last_active_at(user['id'], last_active_at)
  315         # get user and verify that the user is still enabled
  316         user = PROVIDERS.identity_api.get_user(user['id'])
  317         self.assertTrue(user['enabled'])
  318 
  319     def test_enabled_after_create_update_user(self):
  320         self.config_fixture.config(group='security_compliance',
  321                                    disable_user_account_days_inactive=90)
  322         # create user without enabled; assert enabled
  323         del self.user_dict['enabled']
  324         user = PROVIDERS.identity_api.create_user(self.user_dict)
  325         user_ref = self._get_user_ref(user['id'])
  326         self.assertTrue(user_ref.enabled)
  327         now = datetime.datetime.utcnow().date()
  328         self.assertGreaterEqual(now, user_ref.last_active_at)
  329         # set enabled and test
  330         user['enabled'] = True
  331         PROVIDERS.identity_api.update_user(user['id'], user)
  332         user_ref = self._get_user_ref(user['id'])
  333         self.assertTrue(user_ref.enabled)
  334         # set disabled and test
  335         user['enabled'] = False
  336         PROVIDERS.identity_api.update_user(user['id'], user)
  337         user_ref = self._get_user_ref(user['id'])
  338         self.assertFalse(user_ref.enabled)
  339         # re-enable user and test
  340         user['enabled'] = True
  341         PROVIDERS.identity_api.update_user(user['id'], user)
  342         user_ref = self._get_user_ref(user['id'])
  343         self.assertTrue(user_ref.enabled)
  344 
  345     def test_ignore_user_inactivity(self):
  346         self.user_dict['options'] = {'ignore_user_inactivity': True}
  347         user = PROVIDERS.identity_api.create_user(
  348             self.user_dict)
  349         # set last_active_at just beyond the max
  350         last_active_at = (
  351             datetime.datetime.utcnow() -
  352             datetime.timedelta(self.max_inactive_days + 1)).date()
  353         self._update_user_last_active_at(user['id'], last_active_at)
  354         # get user and verify that the user is not disabled
  355         user = PROVIDERS.identity_api.get_user(user['id'])
  356         self.assertTrue(user['enabled'])
  357 
  358     def test_ignore_user_inactivity_with_user_disabled(self):
  359         user = PROVIDERS.identity_api.create_user(
  360             self.user_dict)
  361         # set last_active_at just beyond the max
  362         last_active_at = (
  363             datetime.datetime.utcnow() -
  364             datetime.timedelta(self.max_inactive_days + 1)).date()
  365         self._update_user_last_active_at(user['id'], last_active_at)
  366         # get user and verify that the user is disabled
  367         user = PROVIDERS.identity_api.get_user(user['id'])
  368         self.assertFalse(user['enabled'])
  369         # update disabled user with ignore_user_inactivity to true
  370         user['options'] = {'ignore_user_inactivity': True}
  371         user = PROVIDERS.identity_api.update_user(
  372             user['id'], user)
  373         # user is not enabled
  374         user = PROVIDERS.identity_api.get_user(user['id'])
  375         self.assertFalse(user['enabled'])
  376         # Manually set enabled and test
  377         user['enabled'] = True
  378         PROVIDERS.identity_api.update_user(user['id'], user)
  379         user = PROVIDERS.identity_api.get_user(user['id'])
  380         self.assertTrue(user['enabled'])
  381 
  382     def _get_user_dict(self, password):
  383         user = {
  384             'name': uuid.uuid4().hex,
  385             'domain_id': CONF.identity.default_domain_id,
  386             'enabled': True,
  387             'password': password
  388         }
  389         return user
  390 
  391     def _get_user_ref(self, user_id):
  392         with sql.session_for_read() as session:
  393             return session.query(model.User).get(user_id)
  394 
  395     def _create_user(self, user_dict, last_active_at):
  396         user_dict['id'] = uuid.uuid4().hex
  397         with sql.session_for_write() as session:
  398             user_ref = model.User.from_dict(user_dict)
  399             user_ref.last_active_at = last_active_at
  400             session.add(user_ref)
  401             return base.filter_user(user_ref.to_dict())
  402 
  403     def _update_user_last_active_at(self, user_id, last_active_at):
  404         with sql.session_for_write() as session:
  405             user_ref = session.query(model.User).get(user_id)
  406             user_ref.last_active_at = last_active_at
  407             return user_ref
  408 
  409 
  410 class PasswordHistoryValidationTests(test_backend_sql.SqlTests):
  411     def setUp(self):
  412         super(PasswordHistoryValidationTests, self).setUp()
  413         self.max_cnt = 3
  414         self.config_fixture.config(group='security_compliance',
  415                                    unique_last_password_count=self.max_cnt)
  416 
  417     def test_validate_password_history_with_invalid_password(self):
  418         password = uuid.uuid4().hex
  419         user = self._create_user(password)
  420         # Attempt to change to the same password
  421         with self.make_request():
  422             self.assertRaises(exception.PasswordValidationError,
  423                               PROVIDERS.identity_api.change_password,
  424                               user_id=user['id'],
  425                               original_password=password,
  426                               new_password=password)
  427             # Attempt to change to a unique password
  428             new_password = uuid.uuid4().hex
  429             self.assertValidChangePassword(user['id'], password, new_password)
  430             # Attempt to change back to the initial password
  431             self.assertRaises(exception.PasswordValidationError,
  432                               PROVIDERS.identity_api.change_password,
  433                               user_id=user['id'],
  434                               original_password=new_password,
  435                               new_password=password)
  436 
  437     def test_validate_password_history_with_valid_password(self):
  438         passwords = [uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex,
  439                      uuid.uuid4().hex]
  440         user = self._create_user(passwords[0])
  441         self.assertValidChangePassword(user['id'], passwords[0], passwords[1])
  442         self.assertValidChangePassword(user['id'], passwords[1], passwords[2])
  443         self.assertValidChangePassword(user['id'], passwords[2], passwords[3])
  444         # Now you should be able to change the password to match the initial
  445         # password because the password history only contains password elements
  446         # 1, 2, 3
  447         self.assertValidChangePassword(user['id'], passwords[3], passwords[0])
  448 
  449     def test_validate_password_history_with_valid_password_only_once(self):
  450         self.config_fixture.config(group='security_compliance',
  451                                    unique_last_password_count=1)
  452         passwords = [uuid.uuid4().hex, uuid.uuid4().hex]
  453         user = self._create_user(passwords[0])
  454         self.assertValidChangePassword(user['id'], passwords[0], passwords[1])
  455         self.assertValidChangePassword(user['id'], passwords[1], passwords[0])
  456 
  457     def test_validate_password_history_but_start_with_password_none(self):
  458         passwords = [uuid.uuid4().hex, uuid.uuid4().hex]
  459         # Create user and confirm password is None
  460         user = self._create_user(None)
  461         user_ref = self._get_user_ref(user['id'])
  462         self.assertIsNone(user_ref.password)
  463         # Admin password reset
  464         user['password'] = passwords[0]
  465         PROVIDERS.identity_api.update_user(user['id'], user)
  466         # Self-service change password
  467         self.assertValidChangePassword(user['id'], passwords[0], passwords[1])
  468         # Attempt to update with a previous password
  469         with self.make_request():
  470             self.assertRaises(exception.PasswordValidationError,
  471                               PROVIDERS.identity_api.change_password,
  472                               user_id=user['id'],
  473                               original_password=passwords[1],
  474                               new_password=passwords[0])
  475 
  476     def test_disable_password_history_and_repeat_same_password(self):
  477         self.config_fixture.config(group='security_compliance',
  478                                    unique_last_password_count=0)
  479         password = uuid.uuid4().hex
  480         user = self._create_user(password)
  481         # Repeatedly change password with the same password
  482         self.assertValidChangePassword(user['id'], password, password)
  483         self.assertValidChangePassword(user['id'], password, password)
  484 
  485     def test_admin_password_reset_is_not_validated_by_password_history(self):
  486         passwords = [uuid.uuid4().hex, uuid.uuid4().hex]
  487         user = self._create_user(passwords[0])
  488         # Attempt to change password to a unique password
  489         user['password'] = passwords[1]
  490         with self.make_request():
  491             PROVIDERS.identity_api.update_user(user['id'], user)
  492             PROVIDERS.identity_api.authenticate(
  493                 user_id=user['id'], password=passwords[1]
  494             )
  495             # Attempt to change password with the same password
  496             user['password'] = passwords[1]
  497             PROVIDERS.identity_api.update_user(user['id'], user)
  498             PROVIDERS.identity_api.authenticate(
  499                 user_id=user['id'], password=passwords[1]
  500             )
  501             # Attempt to change password with the initial password
  502             user['password'] = passwords[0]
  503             PROVIDERS.identity_api.update_user(user['id'], user)
  504             PROVIDERS.identity_api.authenticate(
  505                 user_id=user['id'], password=passwords[0]
  506             )
  507 
  508     def test_truncate_passwords(self):
  509         user = self._create_user(uuid.uuid4().hex)
  510         self._add_passwords_to_history(user, n=4)
  511         user_ref = self._get_user_ref(user['id'])
  512         self.assertEqual(
  513             len(user_ref.local_user.passwords), (self.max_cnt + 1))
  514 
  515     def test_truncate_passwords_when_max_is_default(self):
  516         self.max_cnt = 1
  517         expected_length = self.max_cnt + 1
  518         self.config_fixture.config(group='security_compliance',
  519                                    unique_last_password_count=self.max_cnt)
  520         user = self._create_user(uuid.uuid4().hex)
  521         self._add_passwords_to_history(user, n=4)
  522         user_ref = self._get_user_ref(user['id'])
  523         self.assertEqual(len(user_ref.local_user.passwords), expected_length)
  524         # Start with multiple passwords and then change max_cnt to one
  525         self.max_cnt = 4
  526         self.config_fixture.config(group='security_compliance',
  527                                    unique_last_password_count=self.max_cnt)
  528         self._add_passwords_to_history(user, n=self.max_cnt)
  529         user_ref = self._get_user_ref(user['id'])
  530         self.assertEqual(
  531             len(user_ref.local_user.passwords), (self.max_cnt + 1))
  532         self.max_cnt = 1
  533         self.config_fixture.config(group='security_compliance',
  534                                    unique_last_password_count=self.max_cnt)
  535         self._add_passwords_to_history(user, n=1)
  536         user_ref = self._get_user_ref(user['id'])
  537         self.assertEqual(len(user_ref.local_user.passwords), expected_length)
  538 
  539     def test_truncate_passwords_when_max_is_default_and_no_password(self):
  540         expected_length = 1
  541         self.max_cnt = 1
  542         self.config_fixture.config(group='security_compliance',
  543                                    unique_last_password_count=self.max_cnt)
  544         user = {
  545             'name': uuid.uuid4().hex,
  546             'domain_id': 'default',
  547             'enabled': True,
  548         }
  549         user = PROVIDERS.identity_api.create_user(user)
  550         self._add_passwords_to_history(user, n=1)
  551         user_ref = self._get_user_ref(user['id'])
  552         self.assertEqual(len(user_ref.local_user.passwords), expected_length)
  553 
  554     def _create_user(self, password):
  555         user = {
  556             'name': uuid.uuid4().hex,
  557             'domain_id': 'default',
  558             'enabled': True,
  559             'password': password
  560         }
  561         return PROVIDERS.identity_api.create_user(user)
  562 
  563     def assertValidChangePassword(self, user_id, password, new_password):
  564         with self.make_request():
  565             PROVIDERS.identity_api.change_password(
  566                 user_id=user_id, original_password=password,
  567                 new_password=new_password
  568             )
  569             PROVIDERS.identity_api.authenticate(
  570                 user_id=user_id, password=new_password
  571             )
  572 
  573     def _add_passwords_to_history(self, user, n):
  574         for _ in range(n):
  575             user['password'] = uuid.uuid4().hex
  576             PROVIDERS.identity_api.update_user(user['id'], user)
  577 
  578     def _get_user_ref(self, user_id):
  579         with sql.session_for_read() as session:
  580             return PROVIDERS.identity_api._get_user(session, user_id)
  581 
  582 
  583 class LockingOutUserTests(test_backend_sql.SqlTests):
  584     def setUp(self):
  585         super(LockingOutUserTests, self).setUp()
  586         self.config_fixture.config(
  587             group='security_compliance',
  588             lockout_failure_attempts=6)
  589         self.config_fixture.config(
  590             group='security_compliance',
  591             lockout_duration=5)
  592         # create user
  593         self.password = uuid.uuid4().hex
  594         user_dict = {
  595             'name': uuid.uuid4().hex,
  596             'domain_id': CONF.identity.default_domain_id,
  597             'enabled': True,
  598             'password': self.password
  599         }
  600         self.user = PROVIDERS.identity_api.create_user(user_dict)
  601 
  602     def test_locking_out_user_after_max_failed_attempts(self):
  603         with self.make_request():
  604             # authenticate with wrong password
  605             self.assertRaises(AssertionError,
  606                               PROVIDERS.identity_api.authenticate,
  607                               user_id=self.user['id'],
  608                               password=uuid.uuid4().hex)
  609             # authenticate with correct password
  610             PROVIDERS.identity_api.authenticate(
  611                 user_id=self.user['id'],
  612                 password=self.password
  613             )
  614             # test locking out user after max failed attempts
  615             self._fail_auth_repeatedly(self.user['id'])
  616             self.assertRaises(exception.AccountLocked,
  617                               PROVIDERS.identity_api.authenticate,
  618                               user_id=self.user['id'],
  619                               password=uuid.uuid4().hex)
  620 
  621     def test_lock_out_for_ignored_user(self):
  622         # mark the user as exempt from failed password attempts
  623         # ignore user and reset password, password not expired
  624         self.user['options'][iro.IGNORE_LOCKOUT_ATTEMPT_OPT.option_name] = True
  625         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
  626 
  627         # fail authentication repeatedly the max number of times
  628         self._fail_auth_repeatedly(self.user['id'])
  629         # authenticate with wrong password, account should not be locked
  630         with self.make_request():
  631             self.assertRaises(AssertionError,
  632                               PROVIDERS.identity_api.authenticate,
  633                               user_id=self.user['id'],
  634                               password=uuid.uuid4().hex)
  635             # authenticate with correct password, account should not be locked
  636             PROVIDERS.identity_api.authenticate(
  637                 user_id=self.user['id'],
  638                 password=self.password
  639             )
  640 
  641     def test_set_enabled_unlocks_user(self):
  642         with self.make_request():
  643             # lockout user
  644             self._fail_auth_repeatedly(self.user['id'])
  645             self.assertRaises(exception.AccountLocked,
  646                               PROVIDERS.identity_api.authenticate,
  647                               user_id=self.user['id'],
  648                               password=uuid.uuid4().hex)
  649             # set enabled, user should be unlocked
  650             self.user['enabled'] = True
  651             PROVIDERS.identity_api.update_user(self.user['id'], self.user)
  652             user_ret = PROVIDERS.identity_api.authenticate(
  653                 user_id=self.user['id'],
  654                 password=self.password
  655             )
  656             self.assertTrue(user_ret['enabled'])
  657 
  658     def test_lockout_duration(self):
  659         # freeze time
  660         with freezegun.freeze_time(datetime.datetime.utcnow()) as frozen_time:
  661             with self.make_request():
  662                 # lockout user
  663                 self._fail_auth_repeatedly(self.user['id'])
  664                 self.assertRaises(exception.AccountLocked,
  665                                   PROVIDERS.identity_api.authenticate,
  666                                   user_id=self.user['id'],
  667                                   password=uuid.uuid4().hex)
  668                 # freeze time past the duration, user should be unlocked and
  669                 # failed auth count should get reset
  670                 frozen_time.tick(delta=datetime.timedelta(
  671                     seconds=CONF.security_compliance.lockout_duration + 1))
  672                 PROVIDERS.identity_api.authenticate(
  673                     user_id=self.user['id'],
  674                     password=self.password
  675                 )
  676                 # test failed auth count was reset by authenticating with the
  677                 # wrong password, should raise an assertion error and not
  678                 # account locked
  679                 self.assertRaises(AssertionError,
  680                                   PROVIDERS.identity_api.authenticate,
  681                                   user_id=self.user['id'],
  682                                   password=uuid.uuid4().hex)
  683 
  684     def test_lockout_duration_failed_auth_cnt_resets(self):
  685         # freeze time
  686         with freezegun.freeze_time(datetime.datetime.utcnow()) as frozen_time:
  687             with self.make_request():
  688                 # lockout user
  689                 self._fail_auth_repeatedly(self.user['id'])
  690                 self.assertRaises(exception.AccountLocked,
  691                                   PROVIDERS.identity_api.authenticate,
  692                                   user_id=self.user['id'],
  693                                   password=uuid.uuid4().hex)
  694                 # freeze time past the duration, failed_auth_cnt should reset
  695                 frozen_time.tick(delta=datetime.timedelta(
  696                     seconds=CONF.security_compliance.lockout_duration + 1))
  697                 # repeat failed auth the max times
  698                 self._fail_auth_repeatedly(self.user['id'])
  699                 # test user account is locked
  700                 self.assertRaises(exception.AccountLocked,
  701                                   PROVIDERS.identity_api.authenticate,
  702                                   user_id=self.user['id'],
  703                                   password=uuid.uuid4().hex)
  704 
  705     def _fail_auth_repeatedly(self, user_id):
  706         wrong_password = uuid.uuid4().hex
  707         for _ in range(CONF.security_compliance.lockout_failure_attempts):
  708             with self.make_request():
  709                 self.assertRaises(AssertionError,
  710                                   PROVIDERS.identity_api.authenticate,
  711                                   user_id=user_id,
  712                                   password=wrong_password)
  713 
  714 
  715 class PasswordExpiresValidationTests(test_backend_sql.SqlTests):
  716     def setUp(self):
  717         super(PasswordExpiresValidationTests, self).setUp()
  718         self.password = uuid.uuid4().hex
  719         self.user_dict = self._get_test_user_dict(self.password)
  720         self.config_fixture.config(
  721             group='security_compliance',
  722             password_expires_days=90)
  723 
  724     def test_authenticate_with_expired_password(self):
  725         # set password created_at so that the password will expire
  726         password_created_at = (
  727             datetime.datetime.utcnow() -
  728             datetime.timedelta(
  729                 days=CONF.security_compliance.password_expires_days + 1)
  730         )
  731         user = self._create_user(self.user_dict, password_created_at)
  732         # test password is expired
  733         with self.make_request():
  734             self.assertRaises(exception.PasswordExpired,
  735                               PROVIDERS.identity_api.authenticate,
  736                               user_id=user['id'],
  737                               password=self.password)
  738 
  739     def test_authenticate_with_non_expired_password(self):
  740         # set password created_at so that the password will not expire
  741         password_created_at = (
  742             datetime.datetime.utcnow() -
  743             datetime.timedelta(
  744                 days=CONF.security_compliance.password_expires_days - 1)
  745         )
  746         user = self._create_user(self.user_dict, password_created_at)
  747         # test password is not expired
  748         with self.make_request():
  749             PROVIDERS.identity_api.authenticate(
  750                 user_id=user['id'], password=self.password
  751             )
  752 
  753     def test_authenticate_with_expired_password_for_ignore_user_option(self):
  754         # set user to have the 'ignore_password_expiry' option set to False
  755         self.user_dict.setdefault('options', {})[
  756             iro.IGNORE_PASSWORD_EXPIRY_OPT.option_name] = False
  757         # set password created_at so that the password will expire
  758         password_created_at = (
  759             datetime.datetime.utcnow() -
  760             datetime.timedelta(
  761                 days=CONF.security_compliance.password_expires_days + 1)
  762         )
  763         user = self._create_user(self.user_dict, password_created_at)
  764         with self.make_request():
  765             self.assertRaises(exception.PasswordExpired,
  766                               PROVIDERS.identity_api.authenticate,
  767                               user_id=user['id'],
  768                               password=self.password)
  769 
  770             # update user to explicitly have the expiry option to True
  771             user['options'][
  772                 iro.IGNORE_PASSWORD_EXPIRY_OPT.option_name] = True
  773             user = PROVIDERS.identity_api.update_user(
  774                 user['id'], user
  775             )
  776             # test password is not expired due to ignore option
  777             PROVIDERS.identity_api.authenticate(
  778                 user_id=user['id'], password=self.password
  779             )
  780 
  781     def _get_test_user_dict(self, password):
  782         test_user_dict = {
  783             'id': uuid.uuid4().hex,
  784             'name': uuid.uuid4().hex,
  785             'domain_id': CONF.identity.default_domain_id,
  786             'enabled': True,
  787             'password': password
  788         }
  789         return test_user_dict
  790 
  791     def _create_user(self, user_dict, password_created_at):
  792         # Bypass business logic and go straight for the identity driver
  793         # (SQL in this case)
  794         driver = PROVIDERS.identity_api.driver
  795         driver.create_user(user_dict['id'], user_dict)
  796         with sql.session_for_write() as session:
  797             user_ref = session.query(model.User).get(user_dict['id'])
  798             user_ref.password_ref.created_at = password_created_at
  799             user_ref.password_ref.expires_at = (
  800                 user_ref._get_password_expires_at(password_created_at))
  801             return base.filter_user(user_ref.to_dict())
  802 
  803 
  804 class MinimumPasswordAgeTests(test_backend_sql.SqlTests):
  805     def setUp(self):
  806         super(MinimumPasswordAgeTests, self).setUp()
  807         self.config_fixture.config(
  808             group='security_compliance',
  809             minimum_password_age=1)
  810         self.initial_password = uuid.uuid4().hex
  811         self.user = self._create_new_user(self.initial_password)
  812 
  813     def test_user_cannot_change_password_before_min_age(self):
  814         # user can change password after create
  815         new_password = uuid.uuid4().hex
  816         self.assertValidChangePassword(self.user['id'], self.initial_password,
  817                                        new_password)
  818         # user cannot change password before min age
  819         with self.make_request():
  820             self.assertRaises(exception.PasswordAgeValidationError,
  821                               PROVIDERS.identity_api.change_password,
  822                               user_id=self.user['id'],
  823                               original_password=new_password,
  824                               new_password=uuid.uuid4().hex)
  825 
  826     def test_user_can_change_password_after_min_age(self):
  827         # user can change password after create
  828         new_password = uuid.uuid4().hex
  829         self.assertValidChangePassword(self.user['id'], self.initial_password,
  830                                        new_password)
  831         # set password_created_at so that the min password age has past
  832         password_created_at = (
  833             datetime.datetime.utcnow() -
  834             datetime.timedelta(
  835                 days=CONF.security_compliance.minimum_password_age + 1))
  836         self._update_password_created_at(self.user['id'], password_created_at)
  837         # user can change their password after min password age has past
  838         self.assertValidChangePassword(self.user['id'], new_password,
  839                                        uuid.uuid4().hex)
  840 
  841     def test_user_can_change_password_after_admin_reset(self):
  842         # user can change password after create
  843         new_password = uuid.uuid4().hex
  844         self.assertValidChangePassword(self.user['id'], self.initial_password,
  845                                        new_password)
  846         # user cannot change password before min age
  847 
  848         with self.make_request():
  849             self.assertRaises(exception.PasswordAgeValidationError,
  850                               PROVIDERS.identity_api.change_password,
  851                               user_id=self.user['id'],
  852                               original_password=new_password,
  853                               new_password=uuid.uuid4().hex)
  854         # admin reset
  855         new_password = uuid.uuid4().hex
  856         self.user['password'] = new_password
  857         PROVIDERS.identity_api.update_user(self.user['id'], self.user)
  858         # user can change password after admin reset
  859         self.assertValidChangePassword(self.user['id'], new_password,
  860                                        uuid.uuid4().hex)
  861 
  862     def assertValidChangePassword(self, user_id, password, new_password):
  863         with self.make_request():
  864             PROVIDERS.identity_api.change_password(
  865                 user_id=user_id, original_password=password,
  866                 new_password=new_password
  867             )
  868             PROVIDERS.identity_api.authenticate(
  869                 user_id=user_id, password=new_password
  870             )
  871 
  872     def _create_new_user(self, password):
  873         user = {
  874             'name': uuid.uuid4().hex,
  875             'domain_id': CONF.identity.default_domain_id,
  876             'enabled': True,
  877             'password': password
  878         }
  879         return PROVIDERS.identity_api.create_user(user)
  880 
  881     def _update_password_created_at(self, user_id, password_create_at):
  882         # User instance has an attribute password_ref. This attribute is used
  883         # in authentication. It always points to the last created password. The
  884         # order of passwords is determined by `created_at` field.
  885         # By changing `created_at`, this method interferes with password_ref
  886         # behaviour, making it return not last value. That's why all passwords
  887         # except the latest, need to have `created_at` slightly less than
  888         # the latest password.
  889         with sql.session_for_write() as session:
  890             user_ref = session.query(model.User).get(user_id)
  891             latest_password = user_ref.password_ref
  892             slightly_less = datetime.timedelta(minutes=1)
  893             for password_ref in user_ref.local_user.passwords:
  894                 password_ref.created_at = password_create_at - slightly_less
  895             latest_password.created_at = password_create_at
  896 
  897 
  898 class ChangePasswordRequiredAfterFirstUse(test_backend_sql.SqlTests):
  899     def _create_user(self, password, change_password_upon_first_use):
  900         self.config_fixture.config(
  901             group='security_compliance',
  902             change_password_upon_first_use=change_password_upon_first_use)
  903         user_dict = {
  904             'name': uuid.uuid4().hex,
  905             'domain_id': CONF.identity.default_domain_id,
  906             'enabled': True,
  907             'password': password
  908         }
  909         return PROVIDERS.identity_api.create_user(user_dict)
  910 
  911     def assertPasswordIsExpired(self, user_id, password):
  912         with self.make_request():
  913             self.assertRaises(exception.PasswordExpired,
  914                               PROVIDERS.identity_api.authenticate,
  915                               user_id=user_id,
  916                               password=password)
  917 
  918     def assertPasswordIsNotExpired(self, user_id, password):
  919         with self.make_request():
  920             PROVIDERS.identity_api.authenticate(
  921                 user_id=user_id, password=password
  922             )
  923 
  924     def test_password_expired_after_create(self):
  925         # create user, password expired
  926         initial_password = uuid.uuid4().hex
  927         user = self._create_user(initial_password, True)
  928         self.assertPasswordIsExpired(user['id'], initial_password)
  929         # change password (self-service), password not expired
  930         new_password = uuid.uuid4().hex
  931         with self.make_request():
  932             PROVIDERS.identity_api.change_password(
  933                 user['id'], initial_password, new_password
  934             )
  935         self.assertPasswordIsNotExpired(user['id'], new_password)
  936 
  937     def test_password_expired_after_reset(self):
  938         # create user with feature disabled, password not expired
  939         initial_password = uuid.uuid4().hex
  940         user = self._create_user(initial_password, False)
  941         self.assertPasswordIsNotExpired(user['id'], initial_password)
  942         # enable change_password_upon_first_use
  943         self.config_fixture.config(
  944             group='security_compliance',
  945             change_password_upon_first_use=True)
  946         # admin reset, password expired
  947         admin_password = uuid.uuid4().hex
  948         user['password'] = admin_password
  949         PROVIDERS.identity_api.update_user(user['id'], user)
  950         self.assertPasswordIsExpired(user['id'], admin_password)
  951         # change password (self-service), password not expired
  952         new_password = uuid.uuid4().hex
  953         with self.make_request():
  954             PROVIDERS.identity_api.change_password(
  955                 user['id'], admin_password, new_password
  956             )
  957         self.assertPasswordIsNotExpired(user['id'], new_password)
  958 
  959     def test_password_not_expired_when_feature_disabled(self):
  960         # create user with feature disabled
  961         initial_password = uuid.uuid4().hex
  962         user = self._create_user(initial_password, False)
  963         self.assertPasswordIsNotExpired(user['id'], initial_password)
  964         # admin reset
  965         admin_password = uuid.uuid4().hex
  966         user['password'] = admin_password
  967         PROVIDERS.identity_api.update_user(user['id'], user)
  968         self.assertPasswordIsNotExpired(user['id'], admin_password)
  969 
  970     def test_password_not_expired_for_ignore_user(self):
  971         # create user with feature disabled, password not expired
  972         initial_password = uuid.uuid4().hex
  973         user = self._create_user(initial_password, False)
  974         self.assertPasswordIsNotExpired(user['id'], initial_password)
  975         # enable change_password_upon_first_use
  976         self.config_fixture.config(
  977             group='security_compliance',
  978             change_password_upon_first_use=True)
  979         # ignore user and reset password, password not expired
  980         user['options'][iro.IGNORE_CHANGE_PASSWORD_OPT.option_name] = True
  981         admin_password = uuid.uuid4().hex
  982         user['password'] = admin_password
  983         PROVIDERS.identity_api.update_user(user['id'], user)
  984         self.assertPasswordIsNotExpired(user['id'], admin_password)
  985         # set ignore user to false and reset password, password is expired
  986         user['options'][iro.IGNORE_CHANGE_PASSWORD_OPT.option_name] = False
  987         admin_password = uuid.uuid4().hex
  988         user['password'] = admin_password
  989         PROVIDERS.identity_api.update_user(user['id'], user)
  990         self.assertPasswordIsExpired(user['id'], admin_password)