"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/test_backend_sql.py" (13 May 2020, 60570 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_backend_sql.py": 16.0.1_vs_17.0.0.

    1 # Copyright 2012 OpenStack Foundation
    2 #
    3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 # not use this file except in compliance with the License. You may obtain
    5 # a copy of the License at
    6 #
    7 #      http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 # License for the specific language governing permissions and limitations
   13 # under the License.
   14 
   15 import datetime
   16 from unittest import mock
   17 import uuid
   18 
   19 import freezegun
   20 from oslo_db import exception as db_exception
   21 from oslo_db import options
   22 import sqlalchemy
   23 from sqlalchemy import exc
   24 from testtools import matchers
   25 
   26 from keystone.common import driver_hints
   27 from keystone.common import provider_api
   28 from keystone.common import sql
   29 from keystone.common.sql import core
   30 import keystone.conf
   31 from keystone.credential.providers import fernet as credential_provider
   32 from keystone import exception
   33 from keystone.identity.backends import sql_model as identity_sql
   34 from keystone.resource.backends import base as resource
   35 from keystone.tests import unit
   36 from keystone.tests.unit.assignment import test_backends as assignment_tests
   37 from keystone.tests.unit.catalog import test_backends as catalog_tests
   38 from keystone.tests.unit import default_fixtures
   39 from keystone.tests.unit.identity import test_backends as identity_tests
   40 from keystone.tests.unit import ksfixtures
   41 from keystone.tests.unit.ksfixtures import database
   42 from keystone.tests.unit.limit import test_backends as limit_tests
   43 from keystone.tests.unit.policy import test_backends as policy_tests
   44 from keystone.tests.unit.resource import test_backends as resource_tests
   45 from keystone.tests.unit.trust import test_backends as trust_tests
   46 from keystone.trust.backends import sql as trust_sql
   47 
   48 
   49 CONF = keystone.conf.CONF
   50 PROVIDERS = provider_api.ProviderAPIs
   51 
   52 
   53 class SqlTests(unit.SQLDriverOverrides, unit.TestCase):
   54 
   55     def setUp(self):
   56         super(SqlTests, self).setUp()
   57         self.useFixture(database.Database())
   58         self.load_backends()
   59 
   60         # populate the engine with tables & fixtures
   61         self.load_fixtures(default_fixtures)
   62         # defaulted by the data load
   63         self.user_foo['enabled'] = True
   64 
   65     def config_files(self):
   66         config_files = super(SqlTests, self).config_files()
   67         config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
   68         return config_files
   69 
   70 
   71 class DataTypeRoundTrips(SqlTests):
   72     def test_json_blob_roundtrip(self):
   73         """Test round-trip of a JSON data structure with JsonBlob."""
   74         with sql.session_for_read() as session:
   75             val = session.scalar(
   76                 sqlalchemy.select(
   77                     [sqlalchemy.literal({"key": "value"}, type_=core.JsonBlob)]
   78                 )
   79             )
   80 
   81         self.assertEqual({"key": "value"}, val)
   82 
   83     def test_json_blob_sql_null(self):
   84         """Test that JsonBlob can accommodate a SQL NULL value in a result set.
   85 
   86         SQL NULL may be handled by JsonBlob in the case where a table is
   87         storing NULL in a JsonBlob column, as several models use this type
   88         in a column that is nullable.   It also comes back when the column
   89         is left NULL from being in an OUTER JOIN.  In Python, this means
   90         the None constant is handled by the datatype.
   91 
   92         """
   93         with sql.session_for_read() as session:
   94             val = session.scalar(
   95                 sqlalchemy.select(
   96                     [sqlalchemy.cast(sqlalchemy.null(), type_=core.JsonBlob)]
   97                 )
   98             )
   99 
  100         self.assertIsNone(val)
  101 
  102     def test_json_blob_python_none(self):
  103         """Test that JsonBlob round-trips a Python None.
  104 
  105         This is where JSON datatypes get a little nutty, in that JSON has
  106         a 'null' keyword, and JsonBlob right now will persist Python None
  107         as the json string 'null', not SQL NULL.
  108 
  109         """
  110         with sql.session_for_read() as session:
  111             val = session.scalar(
  112                 sqlalchemy.select(
  113                     [sqlalchemy.literal(None, type_=core.JsonBlob)]
  114                 )
  115             )
  116 
  117         self.assertIsNone(val)
  118 
  119     def test_json_blob_python_none_renders(self):
  120         """Test that JsonBlob actually renders JSON 'null' for Python None."""
  121         with sql.session_for_read() as session:
  122             val = session.scalar(
  123                 sqlalchemy.select(
  124                     [
  125                         sqlalchemy.cast(
  126                             sqlalchemy.literal(None, type_=core.JsonBlob),
  127                             sqlalchemy.String,
  128                         )
  129                     ]
  130                 )
  131             )
  132 
  133         self.assertEqual("null", val)
  134 
  135     def test_datetimeint_roundtrip(self):
  136         """Test round-trip of a Python datetime with DateTimeInt."""
  137         with sql.session_for_read() as session:
  138             datetime_value = datetime.datetime(2019, 5, 15, 10, 17, 55)
  139             val = session.scalar(
  140                 sqlalchemy.select(
  141                     [
  142                         sqlalchemy.literal(
  143                             datetime_value, type_=core.DateTimeInt
  144                         ),
  145                     ]
  146                 )
  147             )
  148 
  149         self.assertEqual(datetime_value, val)
  150 
  151     def test_datetimeint_persistence(self):
  152         """Test integer persistence with DateTimeInt."""
  153         with sql.session_for_read() as session:
  154             datetime_value = datetime.datetime(2019, 5, 15, 10, 17, 55)
  155             val = session.scalar(
  156                 sqlalchemy.select(
  157                     [
  158                         sqlalchemy.cast(
  159                             sqlalchemy.literal(
  160                                 datetime_value, type_=core.DateTimeInt
  161                             ),
  162                             sqlalchemy.Integer
  163                         )
  164                     ]
  165                 )
  166             )
  167 
  168         self.assertEqual(1557915475000000, val)
  169 
  170     def test_datetimeint_python_none(self):
  171         """Test round-trip of a Python None with DateTimeInt."""
  172         with sql.session_for_read() as session:
  173             val = session.scalar(
  174                 sqlalchemy.select(
  175                     [
  176                         sqlalchemy.literal(None, type_=core.DateTimeInt),
  177                     ]
  178                 )
  179             )
  180 
  181         self.assertIsNone(val)
  182 
  183 
  184 class SqlModels(SqlTests):
  185 
  186     def load_table(self, name):
  187         table = sqlalchemy.Table(name,
  188                                  sql.ModelBase.metadata,
  189                                  autoload=True)
  190         return table
  191 
  192     def assertExpectedSchema(self, table, expected_schema):
  193         """Assert that a table's schema is what we expect.
  194 
  195         :param string table: the name of the table to inspect
  196         :param tuple expected_schema: a tuple of tuples containing the
  197             expected schema
  198         :raises AssertionError: when the database schema doesn't match the
  199             expected schema
  200 
  201         The expected_schema format is simply::
  202 
  203             (
  204                 ('column name', sql type, qualifying detail),
  205                 ...
  206             )
  207 
  208         The qualifying detail varies based on the type of the column::
  209 
  210           - sql.Boolean columns must indicate the column's default value or
  211             None if there is no default
  212           - Columns with a length, like sql.String, must indicate the
  213             column's length
  214           - All other column types should use None
  215 
  216         Example::
  217 
  218             cols = (('id', sql.String, 64),
  219                     ('enabled', sql.Boolean, True),
  220                     ('extra', sql.JsonBlob, None))
  221             self.assertExpectedSchema('table_name', cols)
  222 
  223         """
  224         table = self.load_table(table)
  225 
  226         actual_schema = []
  227         for column in table.c:
  228             if isinstance(column.type, sql.Boolean):
  229                 default = None
  230                 if column.default:
  231                     default = column.default.arg
  232                 actual_schema.append((column.name, type(column.type), default))
  233             elif (hasattr(column.type, 'length') and
  234                     not isinstance(column.type, sql.Enum)):
  235                 # NOTE(dstanek): Even though sql.Enum columns have a length
  236                 # set we don't want to catch them here. Maybe in the future
  237                 # we'll check to see that they contain a list of the correct
  238                 # possible values.
  239                 actual_schema.append((column.name,
  240                                       type(column.type),
  241                                       column.type.length))
  242             else:
  243                 actual_schema.append((column.name, type(column.type), None))
  244 
  245         self.assertItemsEqual(expected_schema, actual_schema)
  246 
  247     def test_user_model(self):
  248         cols = (('id', sql.String, 64),
  249                 ('domain_id', sql.String, 64),
  250                 ('default_project_id', sql.String, 64),
  251                 ('enabled', sql.Boolean, None),
  252                 ('extra', sql.JsonBlob, None),
  253                 ('created_at', sql.DateTime, None),
  254                 ('last_active_at', sqlalchemy.Date, None))
  255         self.assertExpectedSchema('user', cols)
  256 
  257     def test_local_user_model(self):
  258         cols = (('id', sql.Integer, None),
  259                 ('user_id', sql.String, 64),
  260                 ('name', sql.String, 255),
  261                 ('domain_id', sql.String, 64),
  262                 ('failed_auth_count', sql.Integer, None),
  263                 ('failed_auth_at', sql.DateTime, None))
  264         self.assertExpectedSchema('local_user', cols)
  265 
  266     def test_password_model(self):
  267         cols = (('id', sql.Integer, None),
  268                 ('local_user_id', sql.Integer, None),
  269                 ('password_hash', sql.String, 255),
  270                 ('created_at', sql.DateTime, None),
  271                 ('expires_at', sql.DateTime, None),
  272                 ('created_at_int', sql.DateTimeInt, None),
  273                 ('expires_at_int', sql.DateTimeInt, None),
  274                 ('self_service', sql.Boolean, False))
  275         self.assertExpectedSchema('password', cols)
  276 
  277     def test_federated_user_model(self):
  278         cols = (('id', sql.Integer, None),
  279                 ('user_id', sql.String, 64),
  280                 ('idp_id', sql.String, 64),
  281                 ('protocol_id', sql.String, 64),
  282                 ('unique_id', sql.String, 255),
  283                 ('display_name', sql.String, 255))
  284         self.assertExpectedSchema('federated_user', cols)
  285 
  286     def test_nonlocal_user_model(self):
  287         cols = (('domain_id', sql.String, 64),
  288                 ('name', sql.String, 255),
  289                 ('user_id', sql.String, 64))
  290         self.assertExpectedSchema('nonlocal_user', cols)
  291 
  292     def test_group_model(self):
  293         cols = (('id', sql.String, 64),
  294                 ('name', sql.String, 64),
  295                 ('description', sql.Text, None),
  296                 ('domain_id', sql.String, 64),
  297                 ('extra', sql.JsonBlob, None))
  298         self.assertExpectedSchema('group', cols)
  299 
  300     def test_project_model(self):
  301         cols = (('id', sql.String, 64),
  302                 ('name', sql.String, 64),
  303                 ('description', sql.Text, None),
  304                 ('domain_id', sql.String, 64),
  305                 ('enabled', sql.Boolean, None),
  306                 ('extra', sql.JsonBlob, None),
  307                 ('parent_id', sql.String, 64),
  308                 ('is_domain', sql.Boolean, False))
  309         self.assertExpectedSchema('project', cols)
  310 
  311     def test_role_assignment_model(self):
  312         cols = (('type', sql.Enum, None),
  313                 ('actor_id', sql.String, 64),
  314                 ('target_id', sql.String, 64),
  315                 ('role_id', sql.String, 64),
  316                 ('inherited', sql.Boolean, False))
  317         self.assertExpectedSchema('assignment', cols)
  318 
  319     def test_user_group_membership(self):
  320         cols = (('group_id', sql.String, 64),
  321                 ('user_id', sql.String, 64))
  322         self.assertExpectedSchema('user_group_membership', cols)
  323 
  324     def test_revocation_event_model(self):
  325         cols = (('id', sql.Integer, None),
  326                 ('domain_id', sql.String, 64),
  327                 ('project_id', sql.String, 64),
  328                 ('user_id', sql.String, 64),
  329                 ('role_id', sql.String, 64),
  330                 ('trust_id', sql.String, 64),
  331                 ('consumer_id', sql.String, 64),
  332                 ('access_token_id', sql.String, 64),
  333                 ('issued_before', sql.DateTime, None),
  334                 ('expires_at', sql.DateTime, None),
  335                 ('revoked_at', sql.DateTime, None),
  336                 ('audit_id', sql.String, 32),
  337                 ('audit_chain_id', sql.String, 32))
  338         self.assertExpectedSchema('revocation_event', cols)
  339 
  340     def test_project_tags_model(self):
  341         cols = (('project_id', sql.String, 64),
  342                 ('name', sql.Unicode, 255))
  343         self.assertExpectedSchema('project_tag', cols)
  344 
  345 
  346 class SqlIdentity(SqlTests,
  347                   identity_tests.IdentityTests,
  348                   assignment_tests.AssignmentTests,
  349                   assignment_tests.SystemAssignmentTests,
  350                   resource_tests.ResourceTests):
  351     def test_password_hashed(self):
  352         with sql.session_for_read() as session:
  353             user_ref = PROVIDERS.identity_api._get_user(
  354                 session, self.user_foo['id']
  355             )
  356             self.assertNotEqual(self.user_foo['password'],
  357                                 user_ref['password'])
  358 
  359     def test_create_user_with_null_password(self):
  360         user_dict = unit.new_user_ref(
  361             domain_id=CONF.identity.default_domain_id)
  362         user_dict["password"] = None
  363         new_user_dict = PROVIDERS.identity_api.create_user(user_dict)
  364         with sql.session_for_read() as session:
  365             new_user_ref = PROVIDERS.identity_api._get_user(
  366                 session, new_user_dict['id']
  367             )
  368             self.assertIsNone(new_user_ref.password)
  369 
  370     def test_update_user_with_null_password(self):
  371         user_dict = unit.new_user_ref(
  372             domain_id=CONF.identity.default_domain_id)
  373         self.assertTrue(user_dict['password'])
  374         new_user_dict = PROVIDERS.identity_api.create_user(user_dict)
  375         new_user_dict["password"] = None
  376         new_user_dict = PROVIDERS.identity_api.update_user(
  377             new_user_dict['id'], new_user_dict
  378         )
  379         with sql.session_for_read() as session:
  380             new_user_ref = PROVIDERS.identity_api._get_user(
  381                 session, new_user_dict['id']
  382             )
  383             self.assertIsNone(new_user_ref.password)
  384 
  385     def test_delete_user_with_project_association(self):
  386         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  387         user = PROVIDERS.identity_api.create_user(user)
  388         role_member = unit.new_role_ref()
  389         PROVIDERS.role_api.create_role(role_member['id'], role_member)
  390         PROVIDERS.assignment_api.add_role_to_user_and_project(
  391             user['id'], self.project_bar['id'], role_member['id']
  392         )
  393         PROVIDERS.identity_api.delete_user(user['id'])
  394         self.assertRaises(exception.UserNotFound,
  395                           PROVIDERS.assignment_api.list_projects_for_user,
  396                           user['id'])
  397 
  398     def test_create_user_case_sensitivity(self):
  399         # user name case sensitivity is down to the fact that it is marked as
  400         # an SQL UNIQUE column, which may not be valid for other backends, like
  401         # LDAP.
  402 
  403         # create a ref with a lowercase name
  404         ref = unit.new_user_ref(name=uuid.uuid4().hex.lower(),
  405                                 domain_id=CONF.identity.default_domain_id)
  406         ref = PROVIDERS.identity_api.create_user(ref)
  407 
  408         # assign a new ID with the same name, but this time in uppercase
  409         ref['name'] = ref['name'].upper()
  410         PROVIDERS.identity_api.create_user(ref)
  411 
  412     def test_create_project_case_sensitivity(self):
  413         # project name case sensitivity is down to the fact that it is marked
  414         # as an SQL UNIQUE column, which may not be valid for other backends,
  415         # like LDAP.
  416 
  417         # create a ref with a lowercase name
  418         ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  419         PROVIDERS.resource_api.create_project(ref['id'], ref)
  420 
  421         # assign a new ID with the same name, but this time in uppercase
  422         ref['id'] = uuid.uuid4().hex
  423         ref['name'] = ref['name'].upper()
  424         PROVIDERS.resource_api.create_project(ref['id'], ref)
  425 
  426     def test_delete_project_with_user_association(self):
  427         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  428         user = PROVIDERS.identity_api.create_user(user)
  429         role_member = unit.new_role_ref()
  430         PROVIDERS.role_api.create_role(role_member['id'], role_member)
  431         PROVIDERS.assignment_api.add_role_to_user_and_project(
  432             user['id'], self.project_bar['id'], role_member['id']
  433         )
  434         PROVIDERS.resource_api.delete_project(self.project_bar['id'])
  435         projects = PROVIDERS.assignment_api.list_projects_for_user(user['id'])
  436         self.assertEqual([], projects)
  437 
  438     def test_update_project_returns_extra(self):
  439         """Test for backward compatibility with an essex/folsom bug.
  440 
  441         Non-indexed attributes were returned in an 'extra' attribute, instead
  442         of on the entity itself; for consistency and backwards compatibility,
  443         those attributes should be included twice.
  444 
  445         This behavior is specific to the SQL driver.
  446 
  447         """
  448         arbitrary_key = uuid.uuid4().hex
  449         arbitrary_value = uuid.uuid4().hex
  450         project = unit.new_project_ref(
  451             domain_id=CONF.identity.default_domain_id)
  452         project[arbitrary_key] = arbitrary_value
  453         ref = PROVIDERS.resource_api.create_project(project['id'], project)
  454         self.assertEqual(arbitrary_value, ref[arbitrary_key])
  455         self.assertNotIn('extra', ref)
  456 
  457         ref['name'] = uuid.uuid4().hex
  458         ref = PROVIDERS.resource_api.update_project(ref['id'], ref)
  459         self.assertEqual(arbitrary_value, ref[arbitrary_key])
  460         self.assertEqual(arbitrary_value, ref['extra'][arbitrary_key])
  461 
  462     def test_update_user_returns_extra(self):
  463         """Test for backwards-compatibility with an essex/folsom bug.
  464 
  465         Non-indexed attributes were returned in an 'extra' attribute, instead
  466         of on the entity itself; for consistency and backwards compatibility,
  467         those attributes should be included twice.
  468 
  469         This behavior is specific to the SQL driver.
  470 
  471         """
  472         arbitrary_key = uuid.uuid4().hex
  473         arbitrary_value = uuid.uuid4().hex
  474         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  475         user[arbitrary_key] = arbitrary_value
  476         del user["id"]
  477         ref = PROVIDERS.identity_api.create_user(user)
  478         self.assertEqual(arbitrary_value, ref[arbitrary_key])
  479         self.assertNotIn('password', ref)
  480         self.assertNotIn('extra', ref)
  481 
  482         user['name'] = uuid.uuid4().hex
  483         user['password'] = uuid.uuid4().hex
  484         ref = PROVIDERS.identity_api.update_user(ref['id'], user)
  485         self.assertNotIn('password', ref)
  486         self.assertNotIn('password', ref['extra'])
  487         self.assertEqual(arbitrary_value, ref[arbitrary_key])
  488         self.assertEqual(arbitrary_value, ref['extra'][arbitrary_key])
  489 
  490     def test_sql_user_to_dict_null_default_project_id(self):
  491         user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  492         user = PROVIDERS.identity_api.create_user(user)
  493         with sql.session_for_read() as session:
  494             query = session.query(identity_sql.User)
  495             query = query.filter_by(id=user['id'])
  496             raw_user_ref = query.one()
  497             self.assertIsNone(raw_user_ref.default_project_id)
  498             user_ref = raw_user_ref.to_dict()
  499             self.assertNotIn('default_project_id', user_ref)
  500             session.close()
  501 
  502     def test_list_domains_for_user(self):
  503         domain = unit.new_domain_ref()
  504         PROVIDERS.resource_api.create_domain(domain['id'], domain)
  505         user = unit.new_user_ref(domain_id=domain['id'])
  506 
  507         test_domain1 = unit.new_domain_ref()
  508         PROVIDERS.resource_api.create_domain(test_domain1['id'], test_domain1)
  509         test_domain2 = unit.new_domain_ref()
  510         PROVIDERS.resource_api.create_domain(test_domain2['id'], test_domain2)
  511 
  512         user = PROVIDERS.identity_api.create_user(user)
  513         user_domains = PROVIDERS.assignment_api.list_domains_for_user(
  514             user['id']
  515         )
  516         self.assertEqual(0, len(user_domains))
  517         PROVIDERS.assignment_api.create_grant(
  518             user_id=user['id'], domain_id=test_domain1['id'],
  519             role_id=self.role_member['id']
  520         )
  521         PROVIDERS.assignment_api.create_grant(
  522             user_id=user['id'], domain_id=test_domain2['id'],
  523             role_id=self.role_member['id']
  524         )
  525         user_domains = PROVIDERS.assignment_api.list_domains_for_user(
  526             user['id']
  527         )
  528         self.assertThat(user_domains, matchers.HasLength(2))
  529 
  530     def test_list_domains_for_user_with_grants(self):
  531         # Create two groups each with a role on a different domain, and
  532         # make user1 a member of both groups.  Both these new domains
  533         # should now be included, along with any direct user grants.
  534         domain = unit.new_domain_ref()
  535         PROVIDERS.resource_api.create_domain(domain['id'], domain)
  536         user = unit.new_user_ref(domain_id=domain['id'])
  537         user = PROVIDERS.identity_api.create_user(user)
  538         group1 = unit.new_group_ref(domain_id=domain['id'])
  539         group1 = PROVIDERS.identity_api.create_group(group1)
  540         group2 = unit.new_group_ref(domain_id=domain['id'])
  541         group2 = PROVIDERS.identity_api.create_group(group2)
  542 
  543         test_domain1 = unit.new_domain_ref()
  544         PROVIDERS.resource_api.create_domain(test_domain1['id'], test_domain1)
  545         test_domain2 = unit.new_domain_ref()
  546         PROVIDERS.resource_api.create_domain(test_domain2['id'], test_domain2)
  547         test_domain3 = unit.new_domain_ref()
  548         PROVIDERS.resource_api.create_domain(test_domain3['id'], test_domain3)
  549 
  550         PROVIDERS.identity_api.add_user_to_group(user['id'], group1['id'])
  551         PROVIDERS.identity_api.add_user_to_group(user['id'], group2['id'])
  552 
  553         # Create 3 grants, one user grant, the other two as group grants
  554         PROVIDERS.assignment_api.create_grant(
  555             user_id=user['id'], domain_id=test_domain1['id'],
  556             role_id=self.role_member['id']
  557         )
  558         PROVIDERS.assignment_api.create_grant(
  559             group_id=group1['id'], domain_id=test_domain2['id'],
  560             role_id=self.role_admin['id']
  561         )
  562         PROVIDERS.assignment_api.create_grant(
  563             group_id=group2['id'], domain_id=test_domain3['id'],
  564             role_id=self.role_admin['id']
  565         )
  566         user_domains = PROVIDERS.assignment_api.list_domains_for_user(
  567             user['id']
  568         )
  569         self.assertThat(user_domains, matchers.HasLength(3))
  570 
  571     def test_list_domains_for_user_with_inherited_grants(self):
  572         """Test that inherited roles on the domain are excluded.
  573 
  574         Test Plan:
  575 
  576         - Create two domains, one user, group and role
  577         - Domain1 is given an inherited user role, Domain2 an inherited
  578           group role (for a group of which the user is a member)
  579         - When listing domains for user, neither domain should be returned
  580 
  581         """
  582         domain1 = unit.new_domain_ref()
  583         domain1 = PROVIDERS.resource_api.create_domain(domain1['id'], domain1)
  584         domain2 = unit.new_domain_ref()
  585         domain2 = PROVIDERS.resource_api.create_domain(domain2['id'], domain2)
  586         user = unit.new_user_ref(domain_id=domain1['id'])
  587         user = PROVIDERS.identity_api.create_user(user)
  588         group = unit.new_group_ref(domain_id=domain1['id'])
  589         group = PROVIDERS.identity_api.create_group(group)
  590         PROVIDERS.identity_api.add_user_to_group(user['id'], group['id'])
  591         role = unit.new_role_ref()
  592         PROVIDERS.role_api.create_role(role['id'], role)
  593 
  594         # Create a grant on each domain, one user grant, one group grant,
  595         # both inherited.
  596         PROVIDERS.assignment_api.create_grant(
  597             user_id=user['id'], domain_id=domain1['id'], role_id=role['id'],
  598             inherited_to_projects=True
  599         )
  600         PROVIDERS.assignment_api.create_grant(
  601             group_id=group['id'], domain_id=domain2['id'], role_id=role['id'],
  602             inherited_to_projects=True
  603         )
  604 
  605         user_domains = PROVIDERS.assignment_api.list_domains_for_user(
  606             user['id']
  607         )
  608         # No domains should be returned since both domains have only inherited
  609         # roles assignments.
  610         self.assertThat(user_domains, matchers.HasLength(0))
  611 
  612     def test_list_groups_for_user(self):
  613         domain = self._get_domain_fixture()
  614         test_groups = []
  615         test_users = []
  616         GROUP_COUNT = 3
  617         USER_COUNT = 2
  618 
  619         for x in range(0, USER_COUNT):
  620             new_user = unit.new_user_ref(domain_id=domain['id'])
  621             new_user = PROVIDERS.identity_api.create_user(new_user)
  622             test_users.append(new_user)
  623         positive_user = test_users[0]
  624         negative_user = test_users[1]
  625 
  626         for x in range(0, USER_COUNT):
  627             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  628                 test_users[x]['id'])
  629             self.assertEqual(0, len(group_refs))
  630 
  631         for x in range(0, GROUP_COUNT):
  632             before_count = x
  633             after_count = x + 1
  634             new_group = unit.new_group_ref(domain_id=domain['id'])
  635             new_group = PROVIDERS.identity_api.create_group(new_group)
  636             test_groups.append(new_group)
  637 
  638             # add the user to the group and ensure that the
  639             # group count increases by one for each
  640             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  641                 positive_user['id'])
  642             self.assertEqual(before_count, len(group_refs))
  643             PROVIDERS.identity_api.add_user_to_group(
  644                 positive_user['id'],
  645                 new_group['id'])
  646             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  647                 positive_user['id'])
  648             self.assertEqual(after_count, len(group_refs))
  649 
  650             # Make sure the group count for the unrelated user did not change
  651             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  652                 negative_user['id'])
  653             self.assertEqual(0, len(group_refs))
  654 
  655         # remove the user from each group and ensure that
  656         # the group count reduces by one for each
  657         for x in range(0, 3):
  658             before_count = GROUP_COUNT - x
  659             after_count = GROUP_COUNT - x - 1
  660             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  661                 positive_user['id'])
  662             self.assertEqual(before_count, len(group_refs))
  663             PROVIDERS.identity_api.remove_user_from_group(
  664                 positive_user['id'],
  665                 test_groups[x]['id'])
  666             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  667                 positive_user['id'])
  668             self.assertEqual(after_count, len(group_refs))
  669             # Make sure the group count for the unrelated user
  670             # did not change
  671             group_refs = PROVIDERS.identity_api.list_groups_for_user(
  672                 negative_user['id'])
  673             self.assertEqual(0, len(group_refs))
  674 
  675     def test_add_user_to_group_expiring_mapped(self):
  676         self._build_fed_resource()
  677         domain = self._get_domain_fixture()
  678         self.config_fixture.config(group='federation',
  679                                    default_authorization_ttl=5)
  680         time = datetime.datetime.utcnow()
  681         tick = datetime.timedelta(minutes=5)
  682 
  683         new_group = unit.new_group_ref(domain_id=domain['id'])
  684         new_group = PROVIDERS.identity_api.create_group(new_group)
  685 
  686         fed_dict = unit.new_federated_user_ref()
  687         fed_dict['idp_id'] = 'myidp'
  688         fed_dict['protocol_id'] = 'mapped'
  689 
  690         with freezegun.freeze_time(time - tick) as frozen_time:
  691             user = PROVIDERS.identity_api.shadow_federated_user(
  692                 **fed_dict, group_ids=[new_group['id']])
  693 
  694             PROVIDERS.identity_api.check_user_in_group(user['id'],
  695                                                        new_group['id'])
  696 
  697             # Expiration
  698             frozen_time.tick(tick)
  699             self.assertRaises(exception.NotFound,
  700                               PROVIDERS.identity_api.check_user_in_group,
  701                               user['id'],
  702                               new_group['id'])
  703 
  704             # Renewal
  705             PROVIDERS.identity_api.shadow_federated_user(
  706                 **fed_dict, group_ids=[new_group['id']])
  707             PROVIDERS.identity_api.check_user_in_group(user['id'],
  708                                                        new_group['id'])
  709 
  710     def test_add_user_to_group_expiring(self):
  711         self._build_fed_resource()
  712         domain = self._get_domain_fixture()
  713         time = datetime.datetime.utcnow()
  714         tick = datetime.timedelta(minutes=5)
  715 
  716         new_group = unit.new_group_ref(domain_id=domain['id'])
  717         new_group = PROVIDERS.identity_api.create_group(new_group)
  718 
  719         fed_dict = unit.new_federated_user_ref()
  720         fed_dict['idp_id'] = 'myidp'
  721         fed_dict['protocol_id'] = 'mapped'
  722         new_user = PROVIDERS.shadow_users_api.create_federated_user(
  723             domain['id'], fed_dict
  724         )
  725 
  726         with freezegun.freeze_time(time - tick) as frozen_time:
  727             PROVIDERS.shadow_users_api.add_user_to_group_expires(
  728                 new_user['id'], new_group['id'])
  729 
  730             self.config_fixture.config(group='federation',
  731                                        default_authorization_ttl=0)
  732             self.assertRaises(exception.NotFound,
  733                               PROVIDERS.identity_api.check_user_in_group,
  734                               new_user['id'],
  735                               new_group['id'])
  736 
  737             self.config_fixture.config(group='federation',
  738                                        default_authorization_ttl=5)
  739             PROVIDERS.identity_api.check_user_in_group(new_user['id'],
  740                                                        new_group['id'])
  741 
  742             # Expiration
  743             frozen_time.tick(tick)
  744             self.assertRaises(exception.NotFound,
  745                               PROVIDERS.identity_api.check_user_in_group,
  746                               new_user['id'],
  747                               new_group['id'])
  748 
  749             # Renewal
  750             PROVIDERS.shadow_users_api.add_user_to_group_expires(
  751                 new_user['id'], new_group['id'])
  752             PROVIDERS.identity_api.check_user_in_group(new_user['id'],
  753                                                        new_group['id'])
  754 
  755     def test_add_user_to_group_expiring_list(self):
  756         self._build_fed_resource()
  757         domain = self._get_domain_fixture()
  758         self.config_fixture.config(group='federation',
  759                                    default_authorization_ttl=5)
  760         time = datetime.datetime.utcnow()
  761         tick = datetime.timedelta(minutes=5)
  762 
  763         new_group = unit.new_group_ref(domain_id=domain['id'])
  764         new_group = PROVIDERS.identity_api.create_group(new_group)
  765         exp_new_group = unit.new_group_ref(domain_id=domain['id'])
  766         exp_new_group = PROVIDERS.identity_api.create_group(exp_new_group)
  767 
  768         fed_dict = unit.new_federated_user_ref()
  769         fed_dict['idp_id'] = 'myidp'
  770         fed_dict['protocol_id'] = 'mapped'
  771         new_user = PROVIDERS.shadow_users_api.create_federated_user(
  772             domain['id'], fed_dict
  773         )
  774 
  775         PROVIDERS.identity_api.add_user_to_group(new_user['id'],
  776                                                  new_group['id'])
  777         PROVIDERS.identity_api.check_user_in_group(new_user['id'],
  778                                                    new_group['id'])
  779 
  780         with freezegun.freeze_time(time - tick) as frozen_time:
  781             PROVIDERS.shadow_users_api.add_user_to_group_expires(
  782                 new_user['id'], exp_new_group['id'])
  783             PROVIDERS.identity_api.check_user_in_group(new_user['id'],
  784                                                        new_group['id'])
  785 
  786             groups = PROVIDERS.identity_api.list_groups_for_user(
  787                 new_user['id'])
  788             self.assertEqual(len(groups), 2)
  789             for group in groups:
  790                 if group.get('membership_expires_at'):
  791                     self.assertEqual(group['membership_expires_at'], time)
  792 
  793             frozen_time.tick(tick)
  794             groups = PROVIDERS.identity_api.list_groups_for_user(
  795                 new_user['id'])
  796             self.assertEqual(len(groups), 1)
  797 
  798     def test_storing_null_domain_id_in_project_ref(self):
  799         """Test the special storage of domain_id=None in sql resource driver.
  800 
  801         The resource driver uses a special value in place of None for domain_id
  802         in the project record. This shouldn't escape the driver. Hence we test
  803         the interface to ensure that you can store a domain_id of None, and
  804         that any special value used inside the driver does not escape through
  805         the interface.
  806 
  807         """
  808         spoiler_project = unit.new_project_ref(
  809             domain_id=CONF.identity.default_domain_id)
  810         PROVIDERS.resource_api.create_project(
  811             spoiler_project['id'], spoiler_project
  812         )
  813 
  814         # First let's create a project with a None domain_id and make sure we
  815         # can read it back.
  816         project = unit.new_project_ref(domain_id=None, is_domain=True)
  817         project = PROVIDERS.resource_api.create_project(project['id'], project)
  818         ref = PROVIDERS.resource_api.get_project(project['id'])
  819         self.assertDictEqual(project, ref)
  820 
  821         # Can we get it by name?
  822         ref = PROVIDERS.resource_api.get_project_by_name(project['name'], None)
  823         self.assertDictEqual(project, ref)
  824 
  825         # Can we filter for them - create a second domain to ensure we are
  826         # testing the receipt of more than one.
  827         project2 = unit.new_project_ref(domain_id=None, is_domain=True)
  828         project2 = PROVIDERS.resource_api.create_project(
  829             project2['id'], project2
  830         )
  831         hints = driver_hints.Hints()
  832         hints.add_filter('domain_id', None)
  833         refs = PROVIDERS.resource_api.list_projects(hints)
  834         self.assertThat(refs, matchers.HasLength(2 + self.domain_count))
  835         self.assertIn(project, refs)
  836         self.assertIn(project2, refs)
  837 
  838         # Can we update it?
  839         project['name'] = uuid.uuid4().hex
  840         PROVIDERS.resource_api.update_project(project['id'], project)
  841         ref = PROVIDERS.resource_api.get_project(project['id'])
  842         self.assertDictEqual(project, ref)
  843 
  844         # Finally, make sure we can delete it
  845         project['enabled'] = False
  846         PROVIDERS.resource_api.update_project(project['id'], project)
  847         PROVIDERS.resource_api.delete_project(project['id'])
  848         self.assertRaises(exception.ProjectNotFound,
  849                           PROVIDERS.resource_api.get_project,
  850                           project['id'])
  851 
  852     def test_hidden_project_domain_root_is_really_hidden(self):
  853         """Ensure we cannot access the hidden root of all project domains.
  854 
  855         Calling any of the driver methods should result in the same as
  856         would be returned if we passed a project that does not exist. We don't
  857         test create_project, since we do not allow a caller of our API to
  858         specify their own ID for a new entity.
  859 
  860         """
  861         def _exercise_project_api(ref_id):
  862             driver = PROVIDERS.resource_api.driver
  863             self.assertRaises(exception.ProjectNotFound,
  864                               driver.get_project,
  865                               ref_id)
  866 
  867             self.assertRaises(exception.ProjectNotFound,
  868                               driver.get_project_by_name,
  869                               resource.NULL_DOMAIN_ID,
  870                               ref_id)
  871 
  872             project_ids = [x['id'] for x in
  873                            driver.list_projects(driver_hints.Hints())]
  874             self.assertNotIn(ref_id, project_ids)
  875 
  876             projects = driver.list_projects_from_ids([ref_id])
  877             self.assertThat(projects, matchers.HasLength(0))
  878 
  879             project_ids = [x for x in
  880                            driver.list_project_ids_from_domain_ids([ref_id])]
  881             self.assertNotIn(ref_id, project_ids)
  882 
  883             self.assertRaises(exception.DomainNotFound,
  884                               driver.list_projects_in_domain,
  885                               ref_id)
  886 
  887             project_ids = [
  888                 x['id'] for x in
  889                 driver.list_projects_acting_as_domain(driver_hints.Hints())]
  890             self.assertNotIn(ref_id, project_ids)
  891 
  892             projects = driver.list_projects_in_subtree(ref_id)
  893             self.assertThat(projects, matchers.HasLength(0))
  894 
  895             self.assertRaises(exception.ProjectNotFound,
  896                               driver.list_project_parents,
  897                               ref_id)
  898 
  899             # A non-existing project just returns True from the driver
  900             self.assertTrue(driver.is_leaf_project(ref_id))
  901 
  902             self.assertRaises(exception.ProjectNotFound,
  903                               driver.update_project,
  904                               ref_id,
  905                               {})
  906 
  907             self.assertRaises(exception.ProjectNotFound,
  908                               driver.delete_project,
  909                               ref_id)
  910 
  911             # Deleting list of projects that includes a non-existing project
  912             # should be silent. The root domain <<keystone.domain.root>> can't
  913             # be deleted.
  914             if ref_id != resource.NULL_DOMAIN_ID:
  915                 driver.delete_projects_from_ids([ref_id])
  916 
  917         _exercise_project_api(uuid.uuid4().hex)
  918         _exercise_project_api(resource.NULL_DOMAIN_ID)
  919 
  920     def test_list_users_call_count(self):
  921         """There should not be O(N) queries."""
  922         # create 10 users. 10 is just a random number
  923         for i in range(10):
  924             user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  925             PROVIDERS.identity_api.create_user(user)
  926 
  927         # sqlalchemy emits various events and allows to listen to them. Here
  928         # bound method `query_counter` will be called each time when a query
  929         # is compiled
  930         class CallCounter(object):
  931             def __init__(self):
  932                 self.calls = 0
  933 
  934             def reset(self):
  935                 self.calls = 0
  936 
  937             def query_counter(self, query):
  938                 self.calls += 1
  939 
  940         counter = CallCounter()
  941         sqlalchemy.event.listen(sqlalchemy.orm.query.Query, 'before_compile',
  942                                 counter.query_counter)
  943 
  944         first_call_users = PROVIDERS.identity_api.list_users()
  945         first_call_counter = counter.calls
  946         # add 10 more users
  947         for i in range(10):
  948             user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  949             PROVIDERS.identity_api.create_user(user)
  950         counter.reset()
  951         second_call_users = PROVIDERS.identity_api.list_users()
  952         # ensure that the number of calls does not depend on the number of
  953         # users fetched.
  954         self.assertNotEqual(len(first_call_users), len(second_call_users))
  955         self.assertEqual(first_call_counter, counter.calls)
  956         self.assertEqual(3, counter.calls)
  957 
  958     def test_check_project_depth(self):
  959         # Create a 3 level project tree:
  960         #
  961         # default_domain
  962         #       |
  963         #   project_1
  964         #       |
  965         #   project_2
  966         project_1 = unit.new_project_ref(
  967             domain_id=CONF.identity.default_domain_id)
  968         PROVIDERS.resource_api.create_project(project_1['id'], project_1)
  969         project_2 = unit.new_project_ref(
  970             domain_id=CONF.identity.default_domain_id,
  971             parent_id=project_1['id'])
  972         PROVIDERS.resource_api.create_project(project_2['id'], project_2)
  973 
  974         # if max_depth is None or >= current project depth, return nothing.
  975         resp = PROVIDERS.resource_api.check_project_depth(max_depth=None)
  976         self.assertIsNone(resp)
  977         resp = PROVIDERS.resource_api.check_project_depth(max_depth=3)
  978         self.assertIsNone(resp)
  979         resp = PROVIDERS.resource_api.check_project_depth(max_depth=4)
  980         self.assertIsNone(resp)
  981         # if max_depth < current project depth, raise LimitTreeExceedError
  982         self.assertRaises(exception.LimitTreeExceedError,
  983                           PROVIDERS.resource_api.check_project_depth,
  984                           2)
  985 
  986 
  987 class SqlTrust(SqlTests, trust_tests.TrustTests):
  988 
  989     def test_trust_expires_at_int_matches_expires_at(self):
  990         with sql.session_for_write() as session:
  991             new_id = uuid.uuid4().hex
  992             self.create_sample_trust(new_id)
  993             trust_ref = session.query(trust_sql.TrustModel).get(new_id)
  994             self.assertIsNotNone(trust_ref._expires_at)
  995             self.assertEqual(trust_ref._expires_at, trust_ref.expires_at_int)
  996             self.assertEqual(trust_ref.expires_at, trust_ref.expires_at_int)
  997 
  998 
  999 class SqlCatalog(SqlTests, catalog_tests.CatalogTests):
 1000 
 1001     _legacy_endpoint_id_in_endpoint = True
 1002     _enabled_default_to_true_when_creating_endpoint = True
 1003 
 1004     def test_get_v3_catalog_project_non_exist(self):
 1005         service = unit.new_service_ref()
 1006         PROVIDERS.catalog_api.create_service(service['id'], service)
 1007 
 1008         malformed_url = "http://192.168.1.104:8774/v2/$(project)s"
 1009         endpoint = unit.new_endpoint_ref(service_id=service['id'],
 1010                                          url=malformed_url,
 1011                                          region_id=None)
 1012         PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint.copy())
 1013         self.assertRaises(exception.ProjectNotFound,
 1014                           PROVIDERS.catalog_api.get_v3_catalog,
 1015                           'fake-user',
 1016                           'fake-project')
 1017 
 1018     def test_get_v3_catalog_with_empty_public_url(self):
 1019         service = unit.new_service_ref()
 1020         PROVIDERS.catalog_api.create_service(service['id'], service)
 1021 
 1022         endpoint = unit.new_endpoint_ref(url='', service_id=service['id'],
 1023                                          region_id=None)
 1024         PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint.copy())
 1025 
 1026         catalog = PROVIDERS.catalog_api.get_v3_catalog(self.user_foo['id'],
 1027                                                        self.project_bar['id'])
 1028         catalog_endpoint = catalog[0]
 1029         self.assertEqual(service['name'], catalog_endpoint['name'])
 1030         self.assertEqual(service['id'], catalog_endpoint['id'])
 1031         self.assertEqual([], catalog_endpoint['endpoints'])
 1032 
 1033     def test_create_endpoint_region_returns_not_found(self):
 1034         service = unit.new_service_ref()
 1035         PROVIDERS.catalog_api.create_service(service['id'], service)
 1036 
 1037         endpoint = unit.new_endpoint_ref(region_id=uuid.uuid4().hex,
 1038                                          service_id=service['id'])
 1039 
 1040         self.assertRaises(exception.ValidationError,
 1041                           PROVIDERS.catalog_api.create_endpoint,
 1042                           endpoint['id'],
 1043                           endpoint.copy())
 1044 
 1045     def test_create_region_invalid_id(self):
 1046         region = unit.new_region_ref(id='0' * 256)
 1047 
 1048         self.assertRaises(exception.StringLengthExceeded,
 1049                           PROVIDERS.catalog_api.create_region,
 1050                           region)
 1051 
 1052     def test_create_region_invalid_parent_id(self):
 1053         region = unit.new_region_ref(parent_region_id='0' * 256)
 1054 
 1055         self.assertRaises(exception.RegionNotFound,
 1056                           PROVIDERS.catalog_api.create_region,
 1057                           region)
 1058 
 1059     def test_delete_region_with_endpoint(self):
 1060         # create a region
 1061         region = unit.new_region_ref()
 1062         PROVIDERS.catalog_api.create_region(region)
 1063 
 1064         # create a child region
 1065         child_region = unit.new_region_ref(parent_region_id=region['id'])
 1066         PROVIDERS.catalog_api.create_region(child_region)
 1067         # create a service
 1068         service = unit.new_service_ref()
 1069         PROVIDERS.catalog_api.create_service(service['id'], service)
 1070 
 1071         # create an endpoint attached to the service and child region
 1072         child_endpoint = unit.new_endpoint_ref(region_id=child_region['id'],
 1073                                                service_id=service['id'])
 1074 
 1075         PROVIDERS.catalog_api.create_endpoint(
 1076             child_endpoint['id'], child_endpoint
 1077         )
 1078         self.assertRaises(exception.RegionDeletionError,
 1079                           PROVIDERS.catalog_api.delete_region,
 1080                           child_region['id'])
 1081 
 1082         # create an endpoint attached to the service and parent region
 1083         endpoint = unit.new_endpoint_ref(region_id=region['id'],
 1084                                          service_id=service['id'])
 1085 
 1086         PROVIDERS.catalog_api.create_endpoint(endpoint['id'], endpoint)
 1087         self.assertRaises(exception.RegionDeletionError,
 1088                           PROVIDERS.catalog_api.delete_region,
 1089                           region['id'])
 1090 
 1091     def test_v3_catalog_domain_scoped_token(self):
 1092         # test the case that project_id is None.
 1093         srv_1 = unit.new_service_ref()
 1094         PROVIDERS.catalog_api.create_service(srv_1['id'], srv_1)
 1095         endpoint_1 = unit.new_endpoint_ref(service_id=srv_1['id'],
 1096                                            region_id=None)
 1097         PROVIDERS.catalog_api.create_endpoint(endpoint_1['id'], endpoint_1)
 1098 
 1099         srv_2 = unit.new_service_ref()
 1100         PROVIDERS.catalog_api.create_service(srv_2['id'], srv_2)
 1101         endpoint_2 = unit.new_endpoint_ref(service_id=srv_2['id'],
 1102                                            region_id=None)
 1103         PROVIDERS.catalog_api.create_endpoint(endpoint_2['id'], endpoint_2)
 1104 
 1105         self.config_fixture.config(group='endpoint_filter',
 1106                                    return_all_endpoints_if_no_filter=True)
 1107         catalog_ref = PROVIDERS.catalog_api.get_v3_catalog(
 1108             uuid.uuid4().hex, None
 1109         )
 1110         self.assertThat(catalog_ref, matchers.HasLength(2))
 1111         self.config_fixture.config(group='endpoint_filter',
 1112                                    return_all_endpoints_if_no_filter=False)
 1113         catalog_ref = PROVIDERS.catalog_api.get_v3_catalog(
 1114             uuid.uuid4().hex, None
 1115         )
 1116         self.assertThat(catalog_ref, matchers.HasLength(0))
 1117 
 1118     def test_v3_catalog_endpoint_filter_enabled(self):
 1119         srv_1 = unit.new_service_ref()
 1120         PROVIDERS.catalog_api.create_service(srv_1['id'], srv_1)
 1121         endpoint_1 = unit.new_endpoint_ref(service_id=srv_1['id'],
 1122                                            region_id=None)
 1123         PROVIDERS.catalog_api.create_endpoint(endpoint_1['id'], endpoint_1)
 1124         endpoint_2 = unit.new_endpoint_ref(service_id=srv_1['id'],
 1125                                            region_id=None)
 1126         PROVIDERS.catalog_api.create_endpoint(endpoint_2['id'], endpoint_2)
 1127         # create endpoint-project association.
 1128         PROVIDERS.catalog_api.add_endpoint_to_project(
 1129             endpoint_1['id'],
 1130             self.project_bar['id'])
 1131 
 1132         catalog_ref = PROVIDERS.catalog_api.get_v3_catalog(
 1133             uuid.uuid4().hex, self.project_bar['id']
 1134         )
 1135         self.assertThat(catalog_ref, matchers.HasLength(1))
 1136         self.assertThat(catalog_ref[0]['endpoints'], matchers.HasLength(1))
 1137         # the endpoint is that defined in the endpoint-project association.
 1138         self.assertEqual(endpoint_1['id'],
 1139                          catalog_ref[0]['endpoints'][0]['id'])
 1140 
 1141     def test_v3_catalog_endpoint_filter_disabled(self):
 1142         # there is no endpoint-project association defined.
 1143         self.config_fixture.config(group='endpoint_filter',
 1144                                    return_all_endpoints_if_no_filter=True)
 1145         srv_1 = unit.new_service_ref()
 1146         PROVIDERS.catalog_api.create_service(srv_1['id'], srv_1)
 1147         endpoint_1 = unit.new_endpoint_ref(service_id=srv_1['id'],
 1148                                            region_id=None)
 1149         PROVIDERS.catalog_api.create_endpoint(endpoint_1['id'], endpoint_1)
 1150 
 1151         srv_2 = unit.new_service_ref()
 1152         PROVIDERS.catalog_api.create_service(srv_2['id'], srv_2)
 1153 
 1154         catalog_ref = PROVIDERS.catalog_api.get_v3_catalog(
 1155             uuid.uuid4().hex, self.project_bar['id']
 1156         )
 1157         self.assertThat(catalog_ref, matchers.HasLength(2))
 1158         srv_id_list = [catalog_ref[0]['id'], catalog_ref[1]['id']]
 1159         self.assertItemsEqual([srv_1['id'], srv_2['id']], srv_id_list)
 1160 
 1161 
 1162 class SqlPolicy(SqlTests, policy_tests.PolicyTests):
 1163     pass
 1164 
 1165 
 1166 class SqlInheritance(SqlTests, assignment_tests.InheritanceTests):
 1167     pass
 1168 
 1169 
 1170 class SqlImpliedRoles(SqlTests, assignment_tests.ImpliedRoleTests):
 1171     pass
 1172 
 1173 
 1174 class SqlFilterTests(SqlTests, identity_tests.FilterTests):
 1175 
 1176     def clean_up_entities(self):
 1177         """Clean up entity test data from Filter Test Cases."""
 1178         for entity in ['user', 'group', 'project']:
 1179             self._delete_test_data(entity, self.entity_list[entity])
 1180             self._delete_test_data(entity, self.domain1_entity_list[entity])
 1181         del self.entity_list
 1182         del self.domain1_entity_list
 1183         self.domain1['enabled'] = False
 1184         PROVIDERS.resource_api.update_domain(self.domain1['id'], self.domain1)
 1185         PROVIDERS.resource_api.delete_domain(self.domain1['id'])
 1186         del self.domain1
 1187 
 1188     def test_list_entities_filtered_by_domain(self):
 1189         # NOTE(henry-nash): This method is here rather than in
 1190         # unit.identity.test_backends since any domain filtering with LDAP is
 1191         # handled by the manager layer (and is already tested elsewhere) not at
 1192         # the driver level.
 1193         self.addCleanup(self.clean_up_entities)
 1194         self.domain1 = unit.new_domain_ref()
 1195         PROVIDERS.resource_api.create_domain(self.domain1['id'], self.domain1)
 1196 
 1197         self.entity_list = {}
 1198         self.domain1_entity_list = {}
 1199         for entity in ['user', 'group', 'project']:
 1200             # Create 5 entities, 3 of which are in domain1
 1201             DOMAIN1_ENTITIES = 3
 1202             self.entity_list[entity] = self._create_test_data(entity, 2)
 1203             self.domain1_entity_list[entity] = self._create_test_data(
 1204                 entity, DOMAIN1_ENTITIES, self.domain1['id'])
 1205 
 1206             # Should get back the DOMAIN1_ENTITIES in domain1
 1207             hints = driver_hints.Hints()
 1208             hints.add_filter('domain_id', self.domain1['id'])
 1209             entities = self._list_entities(entity)(hints=hints)
 1210             self.assertEqual(DOMAIN1_ENTITIES, len(entities))
 1211             self._match_with_list(entities, self.domain1_entity_list[entity])
 1212             # Check the driver has removed the filter from the list hints
 1213             self.assertFalse(hints.get_exact_filter_by_name('domain_id'))
 1214 
 1215     def test_filter_sql_injection_attack(self):
 1216         """Test against sql injection attack on filters.
 1217 
 1218         Test Plan:
 1219         - Attempt to get all entities back by passing a two-term attribute
 1220         - Attempt to piggyback filter to damage DB (e.g. drop table)
 1221 
 1222         """
 1223         # Check we have some users
 1224         users = PROVIDERS.identity_api.list_users()
 1225         self.assertGreater(len(users), 0)
 1226 
 1227         hints = driver_hints.Hints()
 1228         hints.add_filter('name', "anything' or 'x'='x")
 1229         users = PROVIDERS.identity_api.list_users(hints=hints)
 1230         self.assertEqual(0, len(users))
 1231 
 1232         # See if we can add a SQL command...use the group table instead of the
 1233         # user table since 'user' is reserved word for SQLAlchemy.
 1234         group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
 1235         group = PROVIDERS.identity_api.create_group(group)
 1236 
 1237         hints = driver_hints.Hints()
 1238         hints.add_filter('name', "x'; drop table group")
 1239         groups = PROVIDERS.identity_api.list_groups(hints=hints)
 1240         self.assertEqual(0, len(groups))
 1241 
 1242         groups = PROVIDERS.identity_api.list_groups()
 1243         self.assertGreater(len(groups), 0)
 1244 
 1245 
 1246 class SqlLimitTests(SqlTests, identity_tests.LimitTests):
 1247     def setUp(self):
 1248         super(SqlLimitTests, self).setUp()
 1249         identity_tests.LimitTests.setUp(self)
 1250 
 1251 
 1252 class FakeTable(sql.ModelBase):
 1253     __tablename__ = 'test_table'
 1254     col = sql.Column(sql.String(32), primary_key=True)
 1255 
 1256     @sql.handle_conflicts('keystone')
 1257     def insert(self):
 1258         raise db_exception.DBDuplicateEntry
 1259 
 1260     @sql.handle_conflicts('keystone')
 1261     def update(self):
 1262         raise db_exception.DBError(
 1263             inner_exception=exc.IntegrityError('a', 'a', 'a'))
 1264 
 1265     @sql.handle_conflicts('keystone')
 1266     def lookup(self):
 1267         raise KeyError
 1268 
 1269 
 1270 class SqlDecorators(unit.TestCase):
 1271 
 1272     def test_initialization_fail(self):
 1273         self.assertRaises(exception.StringLengthExceeded,
 1274                           FakeTable, col='a' * 64)
 1275 
 1276     def test_initialization(self):
 1277         tt = FakeTable(col='a')
 1278         self.assertEqual('a', tt.col)
 1279 
 1280     def test_conflict_happend(self):
 1281         self.assertRaises(exception.Conflict, FakeTable().insert)
 1282         self.assertRaises(exception.UnexpectedError, FakeTable().update)
 1283 
 1284     def test_not_conflict_error(self):
 1285         self.assertRaises(KeyError, FakeTable().lookup)
 1286 
 1287 
 1288 class SqlModuleInitialization(unit.TestCase):
 1289 
 1290     @mock.patch.object(sql.core, 'CONF')
 1291     @mock.patch.object(options, 'set_defaults')
 1292     def test_initialize_module(self, set_defaults, CONF):
 1293         sql.initialize()
 1294         set_defaults.assert_called_with(CONF,
 1295                                         connection='sqlite:///keystone.db')
 1296 
 1297 
 1298 class SqlCredential(SqlTests):
 1299 
 1300     def _create_credential_with_user_id(self, user_id=uuid.uuid4().hex):
 1301         credential = unit.new_credential_ref(user_id=user_id,
 1302                                              extra=uuid.uuid4().hex,
 1303                                              type=uuid.uuid4().hex)
 1304         PROVIDERS.credential_api.create_credential(
 1305             credential['id'], credential
 1306         )
 1307         return credential
 1308 
 1309     def _validateCredentialList(self, retrieved_credentials,
 1310                                 expected_credentials):
 1311         self.assertEqual(len(expected_credentials), len(retrieved_credentials))
 1312         retrived_ids = [c['id'] for c in retrieved_credentials]
 1313         for cred in expected_credentials:
 1314             self.assertIn(cred['id'], retrived_ids)
 1315 
 1316     def setUp(self):
 1317         self.useFixture(database.Database())
 1318         super(SqlCredential, self).setUp()
 1319         self.useFixture(
 1320             ksfixtures.KeyRepository(
 1321                 self.config_fixture,
 1322                 'credential',
 1323                 credential_provider.MAX_ACTIVE_KEYS
 1324             )
 1325         )
 1326 
 1327         self.credentials = []
 1328         for _ in range(3):
 1329             self.credentials.append(
 1330                 self._create_credential_with_user_id())
 1331         self.user_credentials = []
 1332         for _ in range(3):
 1333             cred = self._create_credential_with_user_id(self.user_foo['id'])
 1334             self.user_credentials.append(cred)
 1335             self.credentials.append(cred)
 1336 
 1337     def test_list_credentials(self):
 1338         credentials = PROVIDERS.credential_api.list_credentials()
 1339         self._validateCredentialList(credentials, self.credentials)
 1340         # test filtering using hints
 1341         hints = driver_hints.Hints()
 1342         hints.add_filter('user_id', self.user_foo['id'])
 1343         credentials = PROVIDERS.credential_api.list_credentials(hints)
 1344         self._validateCredentialList(credentials, self.user_credentials)
 1345 
 1346     def test_list_credentials_for_user(self):
 1347         credentials = PROVIDERS.credential_api.list_credentials_for_user(
 1348             self.user_foo['id'])
 1349         self._validateCredentialList(credentials, self.user_credentials)
 1350 
 1351     def test_list_credentials_for_user_and_type(self):
 1352         cred = self.user_credentials[0]
 1353         credentials = PROVIDERS.credential_api.list_credentials_for_user(
 1354             self.user_foo['id'], type=cred['type'])
 1355         self._validateCredentialList(credentials, [cred])
 1356 
 1357     def test_create_credential_is_encrypted_when_stored(self):
 1358         credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
 1359         credential_id = credential['id']
 1360         returned_credential = PROVIDERS.credential_api.create_credential(
 1361             credential_id,
 1362             credential
 1363         )
 1364 
 1365         # Make sure the `blob` is *not* encrypted when returned from the
 1366         # credential API.
 1367         self.assertEqual(returned_credential['blob'], credential['blob'])
 1368 
 1369         credential_from_backend = (
 1370             PROVIDERS.credential_api.driver.get_credential(credential_id)
 1371         )
 1372 
 1373         # Pull the credential directly from the backend, the `blob` should be
 1374         # encrypted.
 1375         self.assertNotEqual(
 1376             credential_from_backend['encrypted_blob'],
 1377             credential['blob']
 1378         )
 1379 
 1380     def test_list_credentials_is_decrypted(self):
 1381         credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
 1382         credential_id = credential['id']
 1383 
 1384         created_credential = PROVIDERS.credential_api.create_credential(
 1385             credential_id,
 1386             credential
 1387         )
 1388 
 1389         # Pull the credential directly from the backend, the `blob` should be
 1390         # encrypted.
 1391         credential_from_backend = (
 1392             PROVIDERS.credential_api.driver.get_credential(credential_id)
 1393         )
 1394         self.assertNotEqual(
 1395             credential_from_backend['encrypted_blob'],
 1396             credential['blob']
 1397         )
 1398 
 1399         # Make sure the `blob` values listed from the API are not encrypted.
 1400         listed_credentials = PROVIDERS.credential_api.list_credentials()
 1401         self.assertIn(created_credential, listed_credentials)
 1402 
 1403 
 1404 class SqlRegisteredLimit(SqlTests, limit_tests.RegisteredLimitTests):
 1405 
 1406     def setUp(self):
 1407         super(SqlRegisteredLimit, self).setUp()
 1408 
 1409         fixtures_to_cleanup = []
 1410         for service in default_fixtures.SERVICES:
 1411             service_id = service['id']
 1412             rv = PROVIDERS.catalog_api.create_service(service_id, service)
 1413             attrname = service['extra']['name']
 1414             setattr(self, attrname, rv)
 1415             fixtures_to_cleanup.append(attrname)
 1416         for region in default_fixtures.REGIONS:
 1417             rv = PROVIDERS.catalog_api.create_region(region)
 1418             attrname = region['id']
 1419             setattr(self, attrname, rv)
 1420             fixtures_to_cleanup.append(attrname)
 1421         self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
 1422 
 1423 
 1424 class SqlLimit(SqlTests, limit_tests.LimitTests):
 1425 
 1426     def setUp(self):
 1427         super(SqlLimit, self).setUp()
 1428 
 1429         fixtures_to_cleanup = []
 1430         for service in default_fixtures.SERVICES:
 1431             service_id = service['id']
 1432             rv = PROVIDERS.catalog_api.create_service(service_id, service)
 1433             attrname = service['extra']['name']
 1434             setattr(self, attrname, rv)
 1435             fixtures_to_cleanup.append(attrname)
 1436         for region in default_fixtures.REGIONS:
 1437             rv = PROVIDERS.catalog_api.create_region(region)
 1438             attrname = region['id']
 1439             setattr(self, attrname, rv)
 1440             fixtures_to_cleanup.append(attrname)
 1441         self.addCleanup(self.cleanup_instance(*fixtures_to_cleanup))
 1442 
 1443         registered_limit_1 = unit.new_registered_limit_ref(
 1444             service_id=self.service_one['id'],
 1445             region_id=self.region_one['id'],
 1446             resource_name='volume', default_limit=10, id=uuid.uuid4().hex)
 1447         registered_limit_2 = unit.new_registered_limit_ref(
 1448             service_id=self.service_one['id'],
 1449             region_id=self.region_two['id'],
 1450             resource_name='snapshot', default_limit=10, id=uuid.uuid4().hex)
 1451         registered_limit_3 = unit.new_registered_limit_ref(
 1452             service_id=self.service_one['id'],
 1453             region_id=self.region_two['id'],
 1454             resource_name='backup', default_limit=10, id=uuid.uuid4().hex)
 1455         PROVIDERS.unified_limit_api.create_registered_limits(
 1456             [registered_limit_1, registered_limit_2, registered_limit_3])