"Fossies" - the Fresh Open Source Software Archive

Member "keystone-16.0.2/keystone/tests/unit/identity/backends/test_ldap_common.py" (7 Jun 2021, 25903 Bytes) of package /linux/misc/openstack/keystone-16.0.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_ldap_common.py": 16.0.1_vs_16.0.2.

    1 # -*- coding: utf-8 -*-
    2 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    3 # not use this file except in compliance with the License. You may obtain
    4 # a copy of the License at
    5 #
    6 #      http://www.apache.org/licenses/LICENSE-2.0
    7 #
    8 # Unless required by applicable law or agreed to in writing, software
    9 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   10 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   11 # License for the specific language governing permissions and limitations
   12 # under the License.
   13 
   14 import os
   15 import tempfile
   16 import uuid
   17 
   18 import fixtures
   19 import ldap.dn
   20 import mock
   21 from oslo_config import fixture as config_fixture
   22 
   23 from keystone.common import driver_hints
   24 from keystone.common import provider_api
   25 import keystone.conf
   26 from keystone import exception as ks_exception
   27 from keystone.identity.backends.ldap import common as common_ldap
   28 from keystone.tests import unit
   29 from keystone.tests.unit import default_fixtures
   30 from keystone.tests.unit import fakeldap
   31 from keystone.tests.unit.ksfixtures import database
   32 from keystone.tests.unit.ksfixtures import ldapdb
   33 
   34 
   35 CONF = keystone.conf.CONF
   36 PROVIDERS = provider_api.ProviderAPIs
   37 
   38 
   39 class DnCompareTest(unit.BaseTestCase):
   40     """Test for the DN comparison functions in keystone.common.ldap.core."""
   41 
   42     def test_prep(self):
   43         # prep_case_insensitive returns the string with spaces at the front and
   44         # end if it's already lowercase and no insignificant characters.
   45         value = 'lowercase value'
   46         self.assertEqual(value, common_ldap.prep_case_insensitive(value))
   47 
   48     def test_prep_lowercase(self):
   49         # prep_case_insensitive returns the string with spaces at the front and
   50         # end and lowercases the value.
   51         value = 'UPPERCASE VALUE'
   52         exp_value = value.lower()
   53         self.assertEqual(exp_value, common_ldap.prep_case_insensitive(value))
   54 
   55     def test_prep_insignificant(self):
   56         # prep_case_insensitive remove insignificant spaces.
   57         value = 'before   after'
   58         exp_value = 'before after'
   59         self.assertEqual(exp_value, common_ldap.prep_case_insensitive(value))
   60 
   61     def test_prep_insignificant_pre_post(self):
   62         # prep_case_insensitive remove insignificant spaces.
   63         value = '   value   '
   64         exp_value = 'value'
   65         self.assertEqual(exp_value, common_ldap.prep_case_insensitive(value))
   66 
   67     def test_ava_equal_same(self):
   68         # is_ava_value_equal returns True if the two values are the same.
   69         value = 'val1'
   70         self.assertTrue(common_ldap.is_ava_value_equal('cn', value, value))
   71 
   72     def test_ava_equal_complex(self):
   73         # is_ava_value_equal returns True if the two values are the same using
   74         # a value that's got different capitalization and insignificant chars.
   75         val1 = 'before   after'
   76         val2 = '  BEFORE  afTer '
   77         self.assertTrue(common_ldap.is_ava_value_equal('cn', val1, val2))
   78 
   79     def test_ava_different(self):
   80         # is_ava_value_equal returns False if the values aren't the same.
   81         self.assertFalse(common_ldap.is_ava_value_equal('cn', 'val1', 'val2'))
   82 
   83     def test_rdn_same(self):
   84         # is_rdn_equal returns True if the two values are the same.
   85         rdn = ldap.dn.str2dn('cn=val1')[0]
   86         self.assertTrue(common_ldap.is_rdn_equal(rdn, rdn))
   87 
   88     def test_rdn_diff_length(self):
   89         # is_rdn_equal returns False if the RDNs have a different number of
   90         # AVAs.
   91         rdn1 = ldap.dn.str2dn('cn=cn1')[0]
   92         rdn2 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
   93         self.assertFalse(common_ldap.is_rdn_equal(rdn1, rdn2))
   94 
   95     def test_rdn_multi_ava_same_order(self):
   96         # is_rdn_equal returns True if the RDNs have the same number of AVAs
   97         # and the values are the same.
   98         rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
   99         rdn2 = ldap.dn.str2dn('cn=CN1+ou=OU1')[0]
  100         self.assertTrue(common_ldap.is_rdn_equal(rdn1, rdn2))
  101 
  102     def test_rdn_multi_ava_diff_order(self):
  103         # is_rdn_equal returns True if the RDNs have the same number of AVAs
  104         # and the values are the same, even if in a different order
  105         rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
  106         rdn2 = ldap.dn.str2dn('ou=OU1+cn=CN1')[0]
  107         self.assertTrue(common_ldap.is_rdn_equal(rdn1, rdn2))
  108 
  109     def test_rdn_multi_ava_diff_type(self):
  110         # is_rdn_equal returns False if the RDNs have the same number of AVAs
  111         # and the attribute types are different.
  112         rdn1 = ldap.dn.str2dn('cn=cn1+ou=ou1')[0]
  113         rdn2 = ldap.dn.str2dn('cn=cn1+sn=sn1')[0]
  114         self.assertFalse(common_ldap.is_rdn_equal(rdn1, rdn2))
  115 
  116     def test_rdn_attr_type_case_diff(self):
  117         # is_rdn_equal returns True for same RDNs even when attr type case is
  118         # different.
  119         rdn1 = ldap.dn.str2dn('cn=cn1')[0]
  120         rdn2 = ldap.dn.str2dn('CN=cn1')[0]
  121         self.assertTrue(common_ldap.is_rdn_equal(rdn1, rdn2))
  122 
  123     def test_rdn_attr_type_alias(self):
  124         # is_rdn_equal returns False for same RDNs even when attr type alias is
  125         # used. Note that this is a limitation since an LDAP server should
  126         # consider them equal.
  127         rdn1 = ldap.dn.str2dn('cn=cn1')[0]
  128         rdn2 = ldap.dn.str2dn('2.5.4.3=cn1')[0]
  129         self.assertFalse(common_ldap.is_rdn_equal(rdn1, rdn2))
  130 
  131     def test_dn_same(self):
  132         # is_dn_equal returns True if the DNs are the same.
  133         dn = 'cn=Babs Jansen,ou=OpenStack'
  134         self.assertTrue(common_ldap.is_dn_equal(dn, dn))
  135 
  136     def test_dn_equal_unicode(self):
  137         # is_dn_equal can accept unicode
  138         dn = u'cn=fäké,ou=OpenStack'
  139         self.assertTrue(common_ldap.is_dn_equal(dn, dn))
  140 
  141     def test_dn_diff_length(self):
  142         # is_dn_equal returns False if the DNs don't have the same number of
  143         # RDNs
  144         dn1 = 'cn=Babs Jansen,ou=OpenStack'
  145         dn2 = 'cn=Babs Jansen,ou=OpenStack,dc=example.com'
  146         self.assertFalse(common_ldap.is_dn_equal(dn1, dn2))
  147 
  148     def test_dn_equal_rdns(self):
  149         # is_dn_equal returns True if the DNs have the same number of RDNs
  150         # and each RDN is the same.
  151         dn1 = 'cn=Babs Jansen,ou=OpenStack+cn=OpenSource'
  152         dn2 = 'CN=Babs Jansen,cn=OpenSource+ou=OpenStack'
  153         self.assertTrue(common_ldap.is_dn_equal(dn1, dn2))
  154 
  155     def test_dn_parsed_dns(self):
  156         # is_dn_equal can also accept parsed DNs.
  157         dn_str1 = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack+cn=OpenSource')
  158         dn_str2 = ldap.dn.str2dn('CN=Babs Jansen,cn=OpenSource+ou=OpenStack')
  159         self.assertTrue(common_ldap.is_dn_equal(dn_str1, dn_str2))
  160 
  161     def test_startswith_under_child(self):
  162         # dn_startswith returns True if descendant_dn is a child of dn.
  163         child = 'cn=Babs Jansen,ou=OpenStack'
  164         parent = 'ou=OpenStack'
  165         self.assertTrue(common_ldap.dn_startswith(child, parent))
  166 
  167     def test_startswith_parent(self):
  168         # dn_startswith returns False if descendant_dn is a parent of dn.
  169         child = 'cn=Babs Jansen,ou=OpenStack'
  170         parent = 'ou=OpenStack'
  171         self.assertFalse(common_ldap.dn_startswith(parent, child))
  172 
  173     def test_startswith_same(self):
  174         # dn_startswith returns False if DNs are the same.
  175         dn = 'cn=Babs Jansen,ou=OpenStack'
  176         self.assertFalse(common_ldap.dn_startswith(dn, dn))
  177 
  178     def test_startswith_not_parent(self):
  179         # dn_startswith returns False if descendant_dn is not under the dn
  180         child = 'cn=Babs Jansen,ou=OpenStack'
  181         parent = 'dc=example.com'
  182         self.assertFalse(common_ldap.dn_startswith(child, parent))
  183 
  184     def test_startswith_descendant(self):
  185         # dn_startswith returns True if descendant_dn is a descendant of dn.
  186         descendant = 'cn=Babs Jansen,ou=Keystone,ou=OpenStack,dc=example.com'
  187         dn = 'ou=OpenStack,dc=example.com'
  188         self.assertTrue(common_ldap.dn_startswith(descendant, dn))
  189 
  190         descendant = 'uid=12345,ou=Users,dc=example,dc=com'
  191         dn = 'ou=Users,dc=example,dc=com'
  192         self.assertTrue(common_ldap.dn_startswith(descendant, dn))
  193 
  194     def test_startswith_parsed_dns(self):
  195         # dn_startswith also accepts parsed DNs.
  196         descendant = ldap.dn.str2dn('cn=Babs Jansen,ou=OpenStack')
  197         dn = ldap.dn.str2dn('ou=OpenStack')
  198         self.assertTrue(common_ldap.dn_startswith(descendant, dn))
  199 
  200     def test_startswith_unicode(self):
  201         # dn_startswith accepts unicode.
  202         child = u'cn=fäké,ou=OpenStäck'
  203         parent = u'ou=OpenStäck'
  204         self.assertTrue(common_ldap.dn_startswith(child, parent))
  205 
  206 
  207 class LDAPDeleteTreeTest(unit.TestCase):
  208 
  209     def setUp(self):
  210         super(LDAPDeleteTreeTest, self).setUp()
  211 
  212         self.useFixture(
  213             ldapdb.LDAPDatabase(dbclass=fakeldap.FakeLdapNoSubtreeDelete))
  214         self.useFixture(database.Database())
  215 
  216         self.load_backends()
  217         self.load_fixtures(default_fixtures)
  218 
  219     def config_overrides(self):
  220         super(LDAPDeleteTreeTest, self).config_overrides()
  221         self.config_fixture.config(group='identity', driver='ldap')
  222 
  223     def config_files(self):
  224         config_files = super(LDAPDeleteTreeTest, self).config_files()
  225         config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
  226         return config_files
  227 
  228 
  229 class MultiURLTests(unit.TestCase):
  230     """Test for setting multiple LDAP URLs."""
  231 
  232     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
  233     def test_multiple_urls_with_comma_no_conn_pool(self, mock_ldap_bind):
  234         urls = 'ldap://localhost,ldap://backup.localhost'
  235         self.config_fixture.config(group='ldap', url=urls, use_pool=False)
  236         base_ldap = common_ldap.BaseLdap(CONF)
  237         ldap_connection = base_ldap.get_connection()
  238         self.assertEqual(urls, ldap_connection.conn.conn._uri)
  239 
  240     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
  241     def test_multiple_urls_with_comma_with_conn_pool(self, mock_ldap_bind):
  242         urls = 'ldap://localhost,ldap://backup.localhost'
  243         self.config_fixture.config(group='ldap', url=urls, use_pool=True)
  244         base_ldap = common_ldap.BaseLdap(CONF)
  245         ldap_connection = base_ldap.get_connection()
  246         self.assertEqual(urls, ldap_connection.conn.conn_pool.uri)
  247 
  248 
  249 class LDAPConnectionTimeoutTest(unit.TestCase):
  250     """Test for Network Connection timeout on LDAP URL connection."""
  251 
  252     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
  253     def test_connectivity_timeout_no_conn_pool(self, mock_ldap_bind):
  254         url = 'ldap://localhost'
  255         conn_timeout = 1  # 1 second
  256         self.config_fixture.config(group='ldap',
  257                                    url=url,
  258                                    connection_timeout=conn_timeout,
  259                                    use_pool=False)
  260         base_ldap = common_ldap.BaseLdap(CONF)
  261         ldap_connection = base_ldap.get_connection()
  262         self.assertIsInstance(ldap_connection.conn,
  263                               common_ldap.PythonLDAPHandler)
  264 
  265         # Ensure that the Network Timeout option is set.
  266         # Also ensure that the URL is set.
  267         #
  268         # We will not verify if an LDAP bind returns the timeout
  269         # exception as that would fall under the realm of
  270         # integration testing. If the LDAP option is set properly,
  271         # and we get back a valid connection URI then that should
  272         # suffice for this unit test.
  273         self.assertEqual(conn_timeout,
  274                          ldap.get_option(ldap.OPT_NETWORK_TIMEOUT))
  275         self.assertEqual(url, ldap_connection.conn.conn._uri)
  276 
  277     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
  278     def test_connectivity_timeout_with_conn_pool(self, mock_ldap_bind):
  279         url = 'ldap://localhost'
  280         conn_timeout = 1  # 1 second
  281         self.config_fixture.config(group='ldap',
  282                                    url=url,
  283                                    pool_connection_timeout=conn_timeout,
  284                                    use_pool=True,
  285                                    pool_retry_max=1)
  286         base_ldap = common_ldap.BaseLdap(CONF)
  287         ldap_connection = base_ldap.get_connection()
  288         self.assertIsInstance(ldap_connection.conn,
  289                               common_ldap.PooledLDAPHandler)
  290 
  291         # Ensure that the Network Timeout option is set.
  292         # Also ensure that the URL is set.
  293         #
  294         # We will not verify if an LDAP bind returns the timeout
  295         # exception as that would fall under the realm of
  296         # integration testing. If the LDAP option is set properly,
  297         # and we get back a valid connection URI then that should
  298         # suffice for this unit test.
  299         self.assertEqual(conn_timeout,
  300                          ldap.get_option(ldap.OPT_NETWORK_TIMEOUT))
  301         self.assertEqual(url, ldap_connection.conn.conn_pool.uri)
  302 
  303 
  304 class SslTlsTest(unit.BaseTestCase):
  305     """Test for the SSL/TLS functionality in keystone.common.ldap.core."""
  306 
  307     def setUp(self):
  308         super(SslTlsTest, self).setUp()
  309         self.config_fixture = self.useFixture(config_fixture.Config(CONF))
  310 
  311     @mock.patch.object(common_ldap.KeystoneLDAPHandler, 'simple_bind_s')
  312     @mock.patch.object(ldap.ldapobject.LDAPObject, 'start_tls_s')
  313     def _init_ldap_connection(self, config, mock_ldap_one, mock_ldap_two):
  314         # Attempt to connect to initialize python-ldap.
  315         base_ldap = common_ldap.BaseLdap(config)
  316         base_ldap.get_connection()
  317 
  318     def test_certfile_trust_tls(self):
  319         # We need this to actually exist, so we create a tempfile.
  320         (handle, certfile) = tempfile.mkstemp()
  321         self.addCleanup(os.unlink, certfile)
  322         self.addCleanup(os.close, handle)
  323         self.config_fixture.config(group='ldap',
  324                                    url='ldap://localhost',
  325                                    use_tls=True,
  326                                    tls_cacertfile=certfile)
  327 
  328         self._init_ldap_connection(CONF)
  329 
  330         # Ensure the cert trust option is set.
  331         self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE))
  332 
  333     def test_certdir_trust_tls(self):
  334         # We need this to actually exist, so we create a tempdir.
  335         certdir = self.useFixture(fixtures.TempDir()).path
  336         self.config_fixture.config(group='ldap',
  337                                    url='ldap://localhost',
  338                                    use_tls=True,
  339                                    tls_cacertdir=certdir)
  340 
  341         self._init_ldap_connection(CONF)
  342 
  343         # Ensure the cert trust option is set.
  344         self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR))
  345 
  346     def test_certfile_trust_ldaps(self):
  347         # We need this to actually exist, so we create a tempfile.
  348         (handle, certfile) = tempfile.mkstemp()
  349         self.addCleanup(os.unlink, certfile)
  350         self.addCleanup(os.close, handle)
  351         self.config_fixture.config(group='ldap',
  352                                    url='ldaps://localhost',
  353                                    use_tls=False,
  354                                    tls_cacertfile=certfile)
  355 
  356         self._init_ldap_connection(CONF)
  357 
  358         # Ensure the cert trust option is set.
  359         self.assertEqual(certfile, ldap.get_option(ldap.OPT_X_TLS_CACERTFILE))
  360 
  361     def test_certdir_trust_ldaps(self):
  362         # We need this to actually exist, so we create a tempdir.
  363         certdir = self.useFixture(fixtures.TempDir()).path
  364         self.config_fixture.config(group='ldap',
  365                                    url='ldaps://localhost',
  366                                    use_tls=False,
  367                                    tls_cacertdir=certdir)
  368 
  369         self._init_ldap_connection(CONF)
  370 
  371         # Ensure the cert trust option is set.
  372         self.assertEqual(certdir, ldap.get_option(ldap.OPT_X_TLS_CACERTDIR))
  373 
  374 
  375 class LDAPPagedResultsTest(unit.TestCase):
  376     """Test the paged results functionality in keystone.common.ldap.core."""
  377 
  378     def setUp(self):
  379         super(LDAPPagedResultsTest, self).setUp()
  380 
  381         self.useFixture(ldapdb.LDAPDatabase())
  382         self.useFixture(database.Database())
  383 
  384         self.load_backends()
  385         self.load_fixtures(default_fixtures)
  386 
  387     def config_overrides(self):
  388         super(LDAPPagedResultsTest, self).config_overrides()
  389         self.config_fixture.config(group='identity', driver='ldap')
  390 
  391     def config_files(self):
  392         config_files = super(LDAPPagedResultsTest, self).config_files()
  393         config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
  394         return config_files
  395 
  396     @mock.patch.object(fakeldap.FakeLdap, 'search_ext')
  397     @mock.patch.object(fakeldap.FakeLdap, 'result3')
  398     def test_paged_results_control_api(self, mock_result3, mock_search_ext):
  399         mock_result3.return_value = ('', [], 1, [])
  400 
  401         self.config_fixture.config(group='ldap',
  402                                    page_size=1)
  403 
  404         conn = PROVIDERS.identity_api.user.get_connection()
  405         conn._paged_search_s('dc=example,dc=test',
  406                              ldap.SCOPE_SUBTREE,
  407                              'objectclass=*',
  408                              ['mail', 'userPassword'])
  409         # verify search_ext() args - attrlist is tricky due to ordering
  410         args, _ = mock_search_ext.call_args
  411         self.assertEqual(
  412             ('dc=example,dc=test', 2, 'objectclass=*'), args[0:3])
  413         attrlist = sorted([attr for attr in args[3] if attr])
  414         self.assertEqual(['mail', 'userPassword'], attrlist)
  415 
  416 
  417 class CommonLdapTestCase(unit.BaseTestCase):
  418     """These test cases call functions in keystone.common.ldap."""
  419 
  420     def test_binary_attribute_values(self):
  421         result = [(
  422             'cn=junk,dc=example,dc=com',
  423             {
  424                 'cn': ['junk'],
  425                 'sn': [uuid.uuid4().hex],
  426                 'mail': [uuid.uuid4().hex],
  427                 'binary_attr': [b'\x00\xFF\x00\xFF']
  428             }
  429         ), ]
  430         py_result = common_ldap.convert_ldap_result(result)
  431         # The attribute containing the binary value should
  432         # not be present in the converted result.
  433         self.assertNotIn('binary_attr', py_result[0][1])
  434 
  435     def test_utf8_conversion(self):
  436         value_unicode = u'fäké1'
  437         value_utf8 = value_unicode.encode('utf-8')
  438 
  439         result_utf8 = common_ldap.utf8_encode(value_unicode)
  440         self.assertEqual(value_utf8, result_utf8)
  441 
  442         result_utf8 = common_ldap.utf8_encode(value_utf8)
  443         self.assertEqual(value_utf8, result_utf8)
  444 
  445         result_unicode = common_ldap.utf8_decode(value_utf8)
  446         self.assertEqual(value_unicode, result_unicode)
  447 
  448         result_unicode = common_ldap.utf8_decode(value_unicode)
  449         self.assertEqual(value_unicode, result_unicode)
  450 
  451         self.assertRaises(TypeError,
  452                           common_ldap.utf8_encode,
  453                           100)
  454 
  455         result_unicode = common_ldap.utf8_decode(100)
  456         self.assertEqual(u'100', result_unicode)
  457 
  458     def test_user_id_begins_with_0(self):
  459         user_id = '0123456'
  460         result = [(
  461             'cn=dummy,dc=example,dc=com',
  462             {
  463                 'user_id': [user_id],
  464                 'enabled': ['TRUE']
  465             }
  466         ), ]
  467         py_result = common_ldap.convert_ldap_result(result)
  468         # The user id should be 0123456, and the enabled
  469         # flag should be True
  470         self.assertIs(True, py_result[0][1]['enabled'][0])
  471         self.assertEqual(user_id, py_result[0][1]['user_id'][0])
  472 
  473     def test_user_id_begins_with_0_and_enabled_bit_mask(self):
  474         user_id = '0123456'
  475         bitmask = '225'
  476         expected_bitmask = 225
  477         result = [(
  478             'cn=dummy,dc=example,dc=com',
  479             {
  480                 'user_id': [user_id],
  481                 'enabled': [bitmask]
  482             }
  483         ), ]
  484         py_result = common_ldap.convert_ldap_result(result)
  485         # The user id should be 0123456, and the enabled
  486         # flag should be 225
  487         self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
  488         self.assertEqual(user_id, py_result[0][1]['user_id'][0])
  489 
  490     def test_user_id_and_bitmask_begins_with_0(self):
  491         user_id = '0123456'
  492         bitmask = '0225'
  493         expected_bitmask = 225
  494         result = [(
  495             'cn=dummy,dc=example,dc=com',
  496             {
  497                 'user_id': [user_id],
  498                 'enabled': [bitmask]
  499             }
  500         ), ]
  501         py_result = common_ldap.convert_ldap_result(result)
  502         # The user id should be 0123456, and the enabled
  503         # flag should be 225, the 0 is dropped.
  504         self.assertEqual(expected_bitmask, py_result[0][1]['enabled'][0])
  505         self.assertEqual(user_id, py_result[0][1]['user_id'][0])
  506 
  507     def test_user_id_and_user_name_with_boolean_string(self):
  508         boolean_strings = ['TRUE', 'FALSE', 'true', 'false', 'True', 'False',
  509                            'TrUe' 'FaLse']
  510         for user_name in boolean_strings:
  511             user_id = uuid.uuid4().hex
  512             result = [(
  513                 'cn=dummy,dc=example,dc=com',
  514                 {
  515                     'user_id': [user_id],
  516                     'user_name': [user_name]
  517                 }
  518             ), ]
  519             py_result = common_ldap.convert_ldap_result(result)
  520             # The user name should still be a string value.
  521             self.assertEqual(user_name, py_result[0][1]['user_name'][0])
  522 
  523     def test_user_id_attribute_is_uuid_in_byte_form(self):
  524         results = [(
  525             'cn=alice,dc=example,dc=com',
  526             {
  527                 'cn': [b'cn=alice'],
  528                 'objectGUID': [b'\xdd\xd8Rt\xee]bA\x8e(\xe39\x0b\xe1\xf8\xe8'],
  529                 'email': [uuid.uuid4().hex],
  530                 'sn': [uuid.uuid4().hex]
  531             }
  532         )]
  533         py_result = common_ldap.convert_ldap_result(results)
  534         exp_object_guid = '7452d8dd-5dee-4162-8e28-e3390be1f8e8'
  535         self.assertEqual(exp_object_guid, py_result[0][1]['objectGUID'][0])
  536 
  537 
  538 class LDAPFilterQueryCompositionTest(unit.BaseTestCase):
  539     """These test cases test LDAP filter generation."""
  540 
  541     def setUp(self):
  542         super(LDAPFilterQueryCompositionTest, self).setUp()
  543         self.config_fixture = self.useFixture(config_fixture.Config(CONF))
  544 
  545         self.base_ldap = common_ldap.BaseLdap(self.config_fixture.conf)
  546 
  547         # The tests need an attribute mapping to use.
  548         self.attribute_name = uuid.uuid4().hex
  549         self.filter_attribute_name = uuid.uuid4().hex
  550         self.base_ldap.attribute_mapping = {
  551             self.attribute_name: self.filter_attribute_name
  552         }
  553 
  554     def test_return_query_with_no_hints(self):
  555         hints = driver_hints.Hints()
  556         # NOTE: doesn't have to be a real query, we just need to make sure the
  557         # same string is returned if there are no hints.
  558         query = uuid.uuid4().hex
  559         self.assertEqual(query,
  560                          self.base_ldap.filter_query(hints=hints, query=query))
  561 
  562         # make sure the default query is an empty string
  563         self.assertEqual('', self.base_ldap.filter_query(hints=hints))
  564 
  565     def test_filter_with_empty_query_and_hints_set(self):
  566         hints = driver_hints.Hints()
  567         username = uuid.uuid4().hex
  568         hints.add_filter(name=self.attribute_name,
  569                          value=username,
  570                          comparator='equals',
  571                          case_sensitive=False)
  572         expected_ldap_filter = '(&(%s=%s))' % (
  573             self.filter_attribute_name, username)
  574         self.assertEqual(expected_ldap_filter,
  575                          self.base_ldap.filter_query(hints=hints))
  576 
  577     def test_filter_with_both_query_and_hints_set(self):
  578         hints = driver_hints.Hints()
  579         # NOTE: doesn't have to be a real query, we just need to make sure the
  580         # filter string is concatenated correctly
  581         query = uuid.uuid4().hex
  582         username = uuid.uuid4().hex
  583         expected_result = '(&%(query)s(%(user_name_attr)s=%(username)s))' % (
  584             {'query': query,
  585              'user_name_attr': self.filter_attribute_name,
  586              'username': username})
  587         hints.add_filter(self.attribute_name, username)
  588         self.assertEqual(expected_result,
  589                          self.base_ldap.filter_query(hints=hints, query=query))
  590 
  591     def test_filter_with_hints_and_query_is_none(self):
  592         hints = driver_hints.Hints()
  593         username = uuid.uuid4().hex
  594         hints.add_filter(name=self.attribute_name,
  595                          value=username,
  596                          comparator='equals',
  597                          case_sensitive=False)
  598         expected_ldap_filter = '(&(%s=%s))' % (
  599             self.filter_attribute_name, username)
  600         self.assertEqual(expected_ldap_filter,
  601                          self.base_ldap.filter_query(hints=hints, query=None))
  602 
  603 
  604 class LDAPSizeLimitTest(unit.TestCase):
  605     """Test the size limit exceeded handling in keystone.common.ldap.core."""
  606 
  607     def setUp(self):
  608         super(LDAPSizeLimitTest, self).setUp()
  609 
  610         self.useFixture(ldapdb.LDAPDatabase())
  611         self.useFixture(database.Database())
  612 
  613         self.load_backends()
  614         self.load_fixtures(default_fixtures)
  615 
  616     def config_overrides(self):
  617         super(LDAPSizeLimitTest, self).config_overrides()
  618         self.config_fixture.config(group='identity', driver='ldap')
  619 
  620     def config_files(self):
  621         config_files = super(LDAPSizeLimitTest, self).config_files()
  622         config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
  623         return config_files
  624 
  625     @mock.patch.object(fakeldap.FakeLdap, 'search_s')
  626     def test_search_s_sizelimit_exceeded(self, mock_search_s):
  627         mock_search_s.side_effect = ldap.SIZELIMIT_EXCEEDED
  628         conn = PROVIDERS.identity_api.user.get_connection()
  629         self.assertRaises(ks_exception.LDAPSizeLimitExceeded,
  630                           conn.search_s,
  631                           'dc=example,dc=test',
  632                           ldap.SCOPE_SUBTREE)