"Fossies" - the Fresh Open Source Software Archive

Member "keystone-17.0.0/keystone/tests/unit/test_v3_federation.py" (13 May 2020, 208954 Bytes) of package /linux/misc/openstack/keystone-17.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_v3_federation.py": 16.0.1_vs_17.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #      http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 import copy
   14 import os
   15 import random
   16 import re
   17 import subprocess
   18 from testtools import matchers
   19 from unittest import mock
   20 import uuid
   21 
   22 import fixtures
   23 import flask
   24 import http.client
   25 from lxml import etree
   26 from oslo_serialization import jsonutils
   27 from oslo_utils import importutils
   28 import saml2
   29 from saml2 import saml
   30 from saml2 import sigver
   31 import urllib
   32 xmldsig = importutils.try_import("saml2.xmldsig")
   33 if not xmldsig:
   34     xmldsig = importutils.try_import("xmldsig")
   35 
   36 from keystone.api._shared import authentication
   37 from keystone.api import auth as auth_api
   38 from keystone.common import driver_hints
   39 from keystone.common import provider_api
   40 from keystone.common import render_token
   41 import keystone.conf
   42 from keystone import exception
   43 from keystone.federation import idp as keystone_idp
   44 from keystone.models import token_model
   45 from keystone import notifications
   46 from keystone.tests import unit
   47 from keystone.tests.unit import core
   48 from keystone.tests.unit import federation_fixtures
   49 from keystone.tests.unit import ksfixtures
   50 from keystone.tests.unit import mapping_fixtures
   51 from keystone.tests.unit import test_v3
   52 
   53 
   54 CONF = keystone.conf.CONF
   55 PROVIDERS = provider_api.ProviderAPIs
   56 ROOTDIR = os.path.dirname(os.path.abspath(__file__))
   57 XMLDIR = os.path.join(ROOTDIR, 'saml2/')
   58 
   59 
   60 def dummy_validator(*args, **kwargs):
   61     pass
   62 
   63 
   64 class FederatedSetupMixin(object):
   65 
   66     ACTION = 'authenticate'
   67     IDP = 'ORG_IDP'
   68     PROTOCOL = 'saml2'
   69     AUTH_METHOD = 'saml2'
   70     USER = 'user@ORGANIZATION'
   71     ASSERTION_PREFIX = 'PREFIX_'
   72     IDP_WITH_REMOTE = 'ORG_IDP_REMOTE'
   73     REMOTE_IDS = ['entityID_IDP1', 'entityID_IDP2']
   74     REMOTE_ID_ATTR = uuid.uuid4().hex
   75 
   76     UNSCOPED_V3_SAML2_REQ = {
   77         "identity": {
   78             "methods": [AUTH_METHOD],
   79             AUTH_METHOD: {
   80                 "identity_provider": IDP,
   81                 "protocol": PROTOCOL
   82             }
   83         }
   84     }
   85 
   86     def _check_domains_are_valid(self, token):
   87         domain = PROVIDERS.resource_api.get_domain(self.idp['domain_id'])
   88         self.assertEqual(domain['id'], token['user']['domain']['id'])
   89         self.assertEqual(domain['name'], token['user']['domain']['name'])
   90 
   91     def _project(self, project):
   92         return (project['id'], project['name'])
   93 
   94     def _roles(self, roles):
   95         return set([(r['id'], r['name']) for r in roles])
   96 
   97     def _check_projects_and_roles(self, token, roles, projects):
   98         """Check whether the projects and the roles match."""
   99         token_roles = token.get('roles')
  100         if token_roles is None:
  101             raise AssertionError('Roles not found in the token')
  102         token_roles = self._roles(token_roles)
  103         roles_ref = self._roles(roles)
  104         self.assertEqual(token_roles, roles_ref)
  105 
  106         token_projects = token.get('project')
  107         if token_projects is None:
  108             raise AssertionError('Projects not found in the token')
  109         token_projects = self._project(token_projects)
  110         projects_ref = self._project(projects)
  111         self.assertEqual(token_projects, projects_ref)
  112 
  113     def _check_scoped_token_attributes(self, token):
  114 
  115         for obj in ('user', 'catalog', 'expires_at', 'issued_at',
  116                     'methods', 'roles'):
  117             self.assertIn(obj, token)
  118 
  119         os_federation = token['user']['OS-FEDERATION']
  120 
  121         self.assertIn('groups', os_federation)
  122         self.assertIn('identity_provider', os_federation)
  123         self.assertIn('protocol', os_federation)
  124         self.assertThat(os_federation, matchers.HasLength(3))
  125 
  126         self.assertEqual(self.IDP, os_federation['identity_provider']['id'])
  127         self.assertEqual(self.PROTOCOL, os_federation['protocol']['id'])
  128 
  129     def _check_project_scoped_token_attributes(self, token, project_id):
  130         self.assertEqual(project_id, token['project']['id'])
  131         self._check_scoped_token_attributes(token)
  132 
  133     def _check_domain_scoped_token_attributes(self, token, domain_id):
  134         self.assertEqual(domain_id, token['domain']['id'])
  135         self._check_scoped_token_attributes(token)
  136 
  137     def assertValidMappedUser(self, token):
  138         """Check if user object meets all the criteria."""
  139         user = token['user']
  140         self.assertIn('id', user)
  141         self.assertIn('name', user)
  142         self.assertIn('domain', user)
  143 
  144         self.assertIn('groups', user['OS-FEDERATION'])
  145         self.assertIn('identity_provider', user['OS-FEDERATION'])
  146         self.assertIn('protocol', user['OS-FEDERATION'])
  147 
  148         # Make sure user_name is url safe
  149         self.assertEqual(urllib.parse.quote(user['name']), user['name'])
  150 
  151     def _issue_unscoped_token(self,
  152                               idp=None,
  153                               assertion='EMPLOYEE_ASSERTION',
  154                               environment=None):
  155         environment = environment or {}
  156         environment.update(getattr(mapping_fixtures, assertion))
  157         with self.make_request(environ=environment):
  158             if idp is None:
  159                 idp = self.IDP
  160             r = authentication.federated_authenticate_for_token(
  161                 protocol_id=self.PROTOCOL, identity_provider=idp)
  162         return r
  163 
  164     def idp_ref(self, id=None):
  165         idp = {
  166             'id': id or uuid.uuid4().hex,
  167             'enabled': True,
  168             'description': uuid.uuid4().hex
  169         }
  170         return idp
  171 
  172     def proto_ref(self, mapping_id=None):
  173         proto = {
  174             'id': uuid.uuid4().hex,
  175             'mapping_id': mapping_id or uuid.uuid4().hex
  176         }
  177         return proto
  178 
  179     def mapping_ref(self, rules=None):
  180         return {
  181             'id': uuid.uuid4().hex,
  182             'rules': rules or self.rules['rules']
  183         }
  184 
  185     def _scope_request(self, unscoped_token_id, scope, scope_id):
  186         return {
  187             'auth': {
  188                 'identity': {
  189                     'methods': [
  190                         'token'
  191                     ],
  192                     'token': {
  193                         'id': unscoped_token_id
  194                     }
  195                 },
  196                 'scope': {
  197                     scope: {
  198                         'id': scope_id
  199                     }
  200                 }
  201             }
  202         }
  203 
  204     def _inject_assertion(self, variant):
  205         assertion = getattr(mapping_fixtures, variant)
  206         flask.request.environ.update(assertion)
  207 
  208     def load_federation_sample_data(self):
  209         """Inject additional data."""
  210         # Create and add domains
  211         self.domainA = unit.new_domain_ref()
  212         PROVIDERS.resource_api.create_domain(
  213             self.domainA['id'], self.domainA
  214         )
  215 
  216         self.domainB = unit.new_domain_ref()
  217         PROVIDERS.resource_api.create_domain(
  218             self.domainB['id'], self.domainB
  219         )
  220 
  221         self.domainC = unit.new_domain_ref()
  222         PROVIDERS.resource_api.create_domain(
  223             self.domainC['id'], self.domainC
  224         )
  225 
  226         self.domainD = unit.new_domain_ref()
  227         PROVIDERS.resource_api.create_domain(
  228             self.domainD['id'], self.domainD
  229         )
  230 
  231         # Create and add projects
  232         self.proj_employees = unit.new_project_ref(
  233             domain_id=self.domainA['id'])
  234         PROVIDERS.resource_api.create_project(
  235             self.proj_employees['id'], self.proj_employees
  236         )
  237         self.proj_customers = unit.new_project_ref(
  238             domain_id=self.domainA['id'])
  239         PROVIDERS.resource_api.create_project(
  240             self.proj_customers['id'], self.proj_customers
  241         )
  242 
  243         self.project_all = unit.new_project_ref(
  244             domain_id=self.domainA['id'])
  245         PROVIDERS.resource_api.create_project(
  246             self.project_all['id'], self.project_all
  247         )
  248 
  249         self.project_inherited = unit.new_project_ref(
  250             domain_id=self.domainD['id'])
  251         PROVIDERS.resource_api.create_project(
  252             self.project_inherited['id'], self.project_inherited
  253         )
  254 
  255         # Create and add groups
  256         self.group_employees = unit.new_group_ref(domain_id=self.domainA['id'])
  257         self.group_employees = (
  258             PROVIDERS.identity_api.create_group(self.group_employees))
  259 
  260         self.group_customers = unit.new_group_ref(domain_id=self.domainA['id'])
  261         self.group_customers = (
  262             PROVIDERS.identity_api.create_group(self.group_customers))
  263 
  264         self.group_admins = unit.new_group_ref(domain_id=self.domainA['id'])
  265         self.group_admins = PROVIDERS.identity_api.create_group(
  266             self.group_admins
  267         )
  268 
  269         # Create and add roles
  270         self.role_employee = unit.new_role_ref()
  271         PROVIDERS.role_api.create_role(
  272             self.role_employee['id'], self.role_employee
  273         )
  274         self.role_customer = unit.new_role_ref()
  275         PROVIDERS.role_api.create_role(
  276             self.role_customer['id'], self.role_customer
  277         )
  278 
  279         self.role_admin = unit.new_role_ref()
  280         PROVIDERS.role_api.create_role(self.role_admin['id'], self.role_admin)
  281 
  282         # Employees can access
  283         # * proj_employees
  284         # * project_all
  285         PROVIDERS.assignment_api.create_grant(
  286             self.role_employee['id'], group_id=self.group_employees['id'],
  287             project_id=self.proj_employees['id']
  288         )
  289         PROVIDERS.assignment_api.create_grant(
  290             self.role_employee['id'], group_id=self.group_employees['id'],
  291             project_id=self.project_all['id']
  292         )
  293         # Customers can access
  294         # * proj_customers
  295         PROVIDERS.assignment_api.create_grant(
  296             self.role_customer['id'], group_id=self.group_customers['id'],
  297             project_id=self.proj_customers['id']
  298         )
  299 
  300         # Admins can access:
  301         # * proj_customers
  302         # * proj_employees
  303         # * project_all
  304         PROVIDERS.assignment_api.create_grant(
  305             self.role_admin['id'], group_id=self.group_admins['id'],
  306             project_id=self.proj_customers['id']
  307         )
  308         PROVIDERS.assignment_api.create_grant(
  309             self.role_admin['id'], group_id=self.group_admins['id'],
  310             project_id=self.proj_employees['id']
  311         )
  312         PROVIDERS.assignment_api.create_grant(
  313             self.role_admin['id'], group_id=self.group_admins['id'],
  314             project_id=self.project_all['id']
  315         )
  316 
  317         # Customers can access:
  318         # * domain A
  319         PROVIDERS.assignment_api.create_grant(
  320             self.role_customer['id'], group_id=self.group_customers['id'],
  321             domain_id=self.domainA['id']
  322         )
  323 
  324         # Customers can access projects via inheritance:
  325         # * domain D
  326         PROVIDERS.assignment_api.create_grant(
  327             self.role_customer['id'], group_id=self.group_customers['id'],
  328             domain_id=self.domainD['id'], inherited_to_projects=True
  329         )
  330 
  331         # Employees can access:
  332         # * domain A
  333         # * domain B
  334 
  335         PROVIDERS.assignment_api.create_grant(
  336             self.role_employee['id'], group_id=self.group_employees['id'],
  337             domain_id=self.domainA['id']
  338         )
  339         PROVIDERS.assignment_api.create_grant(
  340             self.role_employee['id'], group_id=self.group_employees['id'],
  341             domain_id=self.domainB['id']
  342         )
  343 
  344         # Admins can access:
  345         # * domain A
  346         # * domain B
  347         # * domain C
  348         PROVIDERS.assignment_api.create_grant(
  349             self.role_admin['id'], group_id=self.group_admins['id'],
  350             domain_id=self.domainA['id']
  351         )
  352         PROVIDERS.assignment_api.create_grant(
  353             self.role_admin['id'], group_id=self.group_admins['id'],
  354             domain_id=self.domainB['id']
  355         )
  356 
  357         PROVIDERS.assignment_api.create_grant(
  358             self.role_admin['id'], group_id=self.group_admins['id'],
  359             domain_id=self.domainC['id']
  360         )
  361         self.rules = {
  362             'rules': [
  363                 {
  364                     'local': [
  365                         {
  366                             'group': {
  367                                 'id': self.group_employees['id']
  368                             }
  369                         },
  370                         {
  371                             'user': {
  372                                 'name': '{0}',
  373                                 'id': '{1}'
  374                             }
  375                         }
  376                     ],
  377                     'remote': [
  378                         {
  379                             'type': 'UserName'
  380                         },
  381                         {
  382                             'type': 'Email',
  383                         },
  384                         {
  385                             'type': 'orgPersonType',
  386                             'any_one_of': [
  387                                 'Employee'
  388                             ]
  389                         }
  390                     ]
  391                 },
  392                 {
  393                     'local': [
  394                         {
  395                             'group': {
  396                                 'id': self.group_employees['id']
  397                             }
  398                         },
  399                         {
  400                             'user': {
  401                                 'name': '{0}',
  402                                 'id': '{1}'
  403                             }
  404                         }
  405                     ],
  406                     'remote': [
  407                         {
  408                             'type': self.ASSERTION_PREFIX + 'UserName'
  409                         },
  410                         {
  411                             'type': self.ASSERTION_PREFIX + 'Email',
  412                         },
  413                         {
  414                             'type': self.ASSERTION_PREFIX + 'orgPersonType',
  415                             'any_one_of': [
  416                                 'SuperEmployee'
  417                             ]
  418                         }
  419                     ]
  420                 },
  421                 {
  422                     'local': [
  423                         {
  424                             'group': {
  425                                 'id': self.group_customers['id']
  426                             }
  427                         },
  428                         {
  429                             'user': {
  430                                 'name': '{0}',
  431                                 'id': '{1}'
  432                             }
  433                         }
  434                     ],
  435                     'remote': [
  436                         {
  437                             'type': 'UserName'
  438                         },
  439                         {
  440                             'type': 'Email'
  441                         },
  442                         {
  443                             'type': 'orgPersonType',
  444                             'any_one_of': [
  445                                 'Customer'
  446                             ]
  447                         }
  448                     ]
  449                 },
  450                 {
  451                     'local': [
  452                         {
  453                             'group': {
  454                                 'id': self.group_admins['id']
  455                             }
  456                         },
  457                         {
  458                             'group': {
  459                                 'id': self.group_employees['id']
  460                             }
  461                         },
  462                         {
  463                             'group': {
  464                                 'id': self.group_customers['id']
  465                             }
  466                         },
  467 
  468                         {
  469                             'user': {
  470                                 'name': '{0}',
  471                                 'id': '{1}'
  472                             }
  473                         }
  474                     ],
  475                     'remote': [
  476                         {
  477                             'type': 'UserName'
  478                         },
  479                         {
  480                             'type': 'Email'
  481                         },
  482                         {
  483                             'type': 'orgPersonType',
  484                             'any_one_of': [
  485                                 'Admin',
  486                                 'Chief'
  487                             ]
  488                         }
  489                     ]
  490                 },
  491                 {
  492                     'local': [
  493                         {
  494                             'group': {
  495                                 'id': uuid.uuid4().hex
  496                             }
  497                         },
  498                         {
  499                             'group': {
  500                                 'id': self.group_customers['id']
  501                             }
  502                         },
  503                         {
  504                             'user': {
  505                                 'name': '{0}',
  506                                 'id': '{1}'
  507                             }
  508                         }
  509                     ],
  510                     'remote': [
  511                         {
  512                             'type': 'UserName',
  513                         },
  514                         {
  515                             'type': 'Email',
  516                         },
  517                         {
  518                             'type': 'FirstName',
  519                             'any_one_of': [
  520                                 'Jill'
  521                             ]
  522                         },
  523                         {
  524                             'type': 'LastName',
  525                             'any_one_of': [
  526                                 'Smith'
  527                             ]
  528                         }
  529                     ]
  530                 },
  531                 {
  532                     'local': [
  533                         {
  534                             'group': {
  535                                 'id': 'this_group_no_longer_exists'
  536                             }
  537                         },
  538                         {
  539                             'user': {
  540                                 'name': '{0}',
  541                                 'id': '{1}'
  542                             }
  543                         }
  544                     ],
  545                     'remote': [
  546                         {
  547                             'type': 'UserName',
  548                         },
  549                         {
  550                             'type': 'Email',
  551                         },
  552                         {
  553                             'type': 'Email',
  554                             'any_one_of': [
  555                                 'testacct@example.com'
  556                             ]
  557                         },
  558                         {
  559                             'type': 'orgPersonType',
  560                             'any_one_of': [
  561                                 'Tester'
  562                             ]
  563                         }
  564                     ]
  565                 },
  566                 # rules with local group names
  567                 {
  568                     "local": [
  569                         {
  570                             'user': {
  571                                 'name': '{0}',
  572                                 'id': '{1}'
  573                             }
  574                         },
  575                         {
  576                             "group": {
  577                                 "name": self.group_customers['name'],
  578                                 "domain": {
  579                                     "name": self.domainA['name']
  580                                 }
  581                             }
  582                         }
  583                     ],
  584                     "remote": [
  585                         {
  586                             'type': 'UserName',
  587                         },
  588                         {
  589                             'type': 'Email',
  590                         },
  591                         {
  592                             "type": "orgPersonType",
  593                             "any_one_of": [
  594                                 "CEO",
  595                                 "CTO"
  596                             ],
  597                         }
  598                     ]
  599                 },
  600                 {
  601                     "local": [
  602                         {
  603                             'user': {
  604                                 'name': '{0}',
  605                                 'id': '{1}'
  606                             }
  607                         },
  608                         {
  609                             "group": {
  610                                 "name": self.group_admins['name'],
  611                                 "domain": {
  612                                     "id": self.domainA['id']
  613                                 }
  614                             }
  615                         }
  616                     ],
  617                     "remote": [
  618                         {
  619                             "type": "UserName",
  620                         },
  621                         {
  622                             "type": "Email",
  623                         },
  624                         {
  625                             "type": "orgPersonType",
  626                             "any_one_of": [
  627                                 "Managers"
  628                             ]
  629                         }
  630                     ]
  631                 },
  632                 {
  633                     "local": [
  634                         {
  635                             "user": {
  636                                 "name": "{0}",
  637                                 "id": "{1}"
  638                             }
  639                         },
  640                         {
  641                             "group": {
  642                                 "name": "NON_EXISTING",
  643                                 "domain": {
  644                                     "id": self.domainA['id']
  645                                 }
  646                             }
  647                         }
  648                     ],
  649                     "remote": [
  650                         {
  651                             "type": "UserName",
  652                         },
  653                         {
  654                             "type": "Email",
  655                         },
  656                         {
  657                             "type": "UserName",
  658                             "any_one_of": [
  659                                 "IamTester"
  660                             ]
  661                         }
  662                     ]
  663                 },
  664                 {
  665                     "local": [
  666                         {
  667                             "user": {
  668                                 "type": "local",
  669                                 "name": self.user['name'],
  670                                 "domain": {
  671                                     "id": self.user['domain_id']
  672                                 }
  673                             }
  674                         },
  675                         {
  676                             "group": {
  677                                 "id": self.group_customers['id']
  678                             }
  679                         }
  680                     ],
  681                     "remote": [
  682                         {
  683                             "type": "UserType",
  684                             "any_one_of": [
  685                                 "random"
  686                             ]
  687                         }
  688                     ]
  689                 },
  690                 {
  691                     "local": [
  692                         {
  693                             "user": {
  694                                 "type": "local",
  695                                 "name": self.user['name'],
  696                                 "domain": {
  697                                     "id": uuid.uuid4().hex
  698                                 }
  699                             }
  700                         }
  701                     ],
  702                     "remote": [
  703                         {
  704                             "type": "Position",
  705                             "any_one_of": [
  706                                 "DirectorGeneral"
  707                             ]
  708                         }
  709                     ]
  710                 },
  711                 # rules for users with no groups
  712                 {
  713                     "local": [
  714                         {
  715                             'user': {
  716                                 'name': '{0}',
  717                                 'id': '{1}'
  718                             }
  719                         }
  720                     ],
  721                     "remote": [
  722                         {
  723                             'type': 'UserName',
  724                         },
  725                         {
  726                             'type': 'Email',
  727                         },
  728                         {
  729                             'type': 'orgPersonType',
  730                             'any_one_of': [
  731                                 'NoGroupsOrg'
  732                             ]
  733                         }
  734                     ]
  735                 }
  736             ]
  737         }
  738 
  739         # Add unused IdP first so it is indexed first (#1838592)
  740         self.dummy_idp = self.idp_ref()
  741         PROVIDERS.federation_api.create_idp(
  742             self.dummy_idp['id'], self.dummy_idp
  743         )
  744         # Add IDP
  745         self.idp = self.idp_ref(id=self.IDP)
  746         PROVIDERS.federation_api.create_idp(
  747             self.idp['id'], self.idp
  748         )
  749         # Add IDP with remote
  750         self.idp_with_remote = self.idp_ref(id=self.IDP_WITH_REMOTE)
  751         self.idp_with_remote['remote_ids'] = self.REMOTE_IDS
  752         PROVIDERS.federation_api.create_idp(
  753             self.idp_with_remote['id'], self.idp_with_remote
  754         )
  755         # Add a mapping
  756         self.mapping = self.mapping_ref()
  757         PROVIDERS.federation_api.create_mapping(
  758             self.mapping['id'], self.mapping
  759         )
  760         # Add protocols
  761         self.proto_saml = self.proto_ref(mapping_id=self.mapping['id'])
  762         self.proto_saml['id'] = self.PROTOCOL
  763         PROVIDERS.federation_api.create_protocol(
  764             self.idp['id'], self.proto_saml['id'], self.proto_saml
  765         )
  766         # Add protocols IDP with remote
  767         PROVIDERS.federation_api.create_protocol(
  768             self.idp_with_remote['id'], self.proto_saml['id'], self.proto_saml
  769         )
  770         # Add unused protocol to go with unused IdP (#1838592)
  771         self.proto_dummy = self.proto_ref(mapping_id=self.mapping['id'])
  772         PROVIDERS.federation_api.create_protocol(
  773             self.dummy_idp['id'], self.proto_dummy['id'], self.proto_dummy
  774         )
  775 
  776         with self.make_request():
  777             self.tokens = {}
  778             VARIANTS = ('EMPLOYEE_ASSERTION', 'CUSTOMER_ASSERTION',
  779                         'ADMIN_ASSERTION')
  780             for variant in VARIANTS:
  781                 self._inject_assertion(variant)
  782                 r = authentication.authenticate_for_token(
  783                     self.UNSCOPED_V3_SAML2_REQ)
  784                 self.tokens[variant] = r.id
  785 
  786             self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN = (
  787                 self._scope_request(
  788                     uuid.uuid4().hex, 'project', self.proj_customers['id']))
  789 
  790             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE = (
  791                 self._scope_request(
  792                     self.tokens['EMPLOYEE_ASSERTION'], 'project',
  793                     self.proj_employees['id']))
  794 
  795             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN = self._scope_request(
  796                 self.tokens['ADMIN_ASSERTION'], 'project',
  797                 self.proj_employees['id'])
  798 
  799             self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN = self._scope_request(
  800                 self.tokens['ADMIN_ASSERTION'], 'project',
  801                 self.proj_customers['id'])
  802 
  803             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER = (
  804                 self._scope_request(
  805                     self.tokens['CUSTOMER_ASSERTION'], 'project',
  806                     self.proj_employees['id']))
  807 
  808             self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER = (
  809                 self._scope_request(
  810                     self.tokens['CUSTOMER_ASSERTION'], 'project',
  811                     self.project_inherited['id']))
  812 
  813             self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER = self._scope_request(
  814                 self.tokens['CUSTOMER_ASSERTION'], 'domain',
  815                 self.domainA['id'])
  816 
  817             self.TOKEN_SCOPE_DOMAIN_B_FROM_CUSTOMER = self._scope_request(
  818                 self.tokens['CUSTOMER_ASSERTION'], 'domain',
  819                 self.domainB['id'])
  820 
  821             self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER = self._scope_request(
  822                 self.tokens['CUSTOMER_ASSERTION'], 'domain',
  823                 self.domainD['id'])
  824 
  825             self.TOKEN_SCOPE_DOMAIN_A_FROM_ADMIN = self._scope_request(
  826                 self.tokens['ADMIN_ASSERTION'], 'domain', self.domainA['id'])
  827 
  828             self.TOKEN_SCOPE_DOMAIN_B_FROM_ADMIN = self._scope_request(
  829                 self.tokens['ADMIN_ASSERTION'], 'domain', self.domainB['id'])
  830 
  831             self.TOKEN_SCOPE_DOMAIN_C_FROM_ADMIN = self._scope_request(
  832                 self.tokens['ADMIN_ASSERTION'], 'domain',
  833                 self.domainC['id'])
  834 
  835 
  836 class FederatedIdentityProviderTests(test_v3.RestfulTestCase):
  837     """A test class for Identity Providers."""
  838 
  839     idp_keys = ['description', 'enabled']
  840 
  841     default_body = {'description': None, 'enabled': True}
  842 
  843     def base_url(self, suffix=None):
  844         if suffix is not None:
  845             return '/OS-FEDERATION/identity_providers/' + str(suffix)
  846         return '/OS-FEDERATION/identity_providers'
  847 
  848     def _fetch_attribute_from_response(self, resp, parameter,
  849                                        assert_is_not_none=True):
  850         """Fetch single attribute from TestResponse object."""
  851         result = resp.result.get(parameter)
  852         if assert_is_not_none:
  853             self.assertIsNotNone(result)
  854         return result
  855 
  856     def _create_and_decapsulate_response(self, body=None):
  857         """Create IdP and fetch it's random id along with entity."""
  858         default_resp = self._create_default_idp(body=body)
  859         idp = self._fetch_attribute_from_response(default_resp,
  860                                                   'identity_provider')
  861         self.assertIsNotNone(idp)
  862         idp_id = idp.get('id')
  863         return (idp_id, idp)
  864 
  865     def _get_idp(self, idp_id):
  866         """Fetch IdP entity based on its id."""
  867         url = self.base_url(suffix=idp_id)
  868         resp = self.get(url)
  869         return resp
  870 
  871     def _create_default_idp(self, body=None,
  872                             expected_status=http.client.CREATED):
  873         """Create default IdP."""
  874         url = self.base_url(suffix=uuid.uuid4().hex)
  875         if body is None:
  876             body = self._http_idp_input()
  877         resp = self.put(url, body={'identity_provider': body},
  878                         expected_status=expected_status)
  879         return resp
  880 
  881     def _http_idp_input(self):
  882         """Create default input dictionary for IdP data."""
  883         body = self.default_body.copy()
  884         body['description'] = uuid.uuid4().hex
  885         return body
  886 
  887     def _assign_protocol_to_idp(self, idp_id=None, proto=None, url=None,
  888                                 mapping_id=None, validate=True, **kwargs):
  889         if url is None:
  890             url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
  891         if idp_id is None:
  892             idp_id, _ = self._create_and_decapsulate_response()
  893         if proto is None:
  894             proto = uuid.uuid4().hex
  895         if mapping_id is None:
  896             mapping_id = uuid.uuid4().hex
  897         self._create_mapping(mapping_id)
  898         body = {'mapping_id': mapping_id}
  899         url = url % {'idp_id': idp_id, 'protocol_id': proto}
  900         resp = self.put(url, body={'protocol': body}, **kwargs)
  901         if validate:
  902             self.assertValidResponse(resp, 'protocol', dummy_validator,
  903                                      keys_to_check=['id', 'mapping_id'],
  904                                      ref={'id': proto,
  905                                           'mapping_id': mapping_id})
  906         return (resp, idp_id, proto)
  907 
  908     def _get_protocol(self, idp_id, protocol_id):
  909         url = '%s/protocols/%s' % (idp_id, protocol_id)
  910         url = self.base_url(suffix=url)
  911         r = self.get(url)
  912         return r
  913 
  914     def _create_mapping(self, mapping_id):
  915         mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
  916         mapping['id'] = mapping_id
  917         url = '/OS-FEDERATION/mappings/%s' % mapping_id
  918         self.put(url,
  919                  body={'mapping': mapping},
  920                  expected_status=http.client.CREATED)
  921 
  922     def assertIdpDomainCreated(self, idp_id, domain_id):
  923         domain = PROVIDERS.resource_api.get_domain(domain_id)
  924         self.assertEqual(domain_id, domain['name'])
  925         self.assertIn(idp_id, domain['description'])
  926 
  927     def test_create_idp_without_domain_id(self):
  928         """Create the IdentityProvider entity associated to remote_ids."""
  929         keys_to_check = list(self.idp_keys)
  930         body = self.default_body.copy()
  931         body['description'] = uuid.uuid4().hex
  932         resp = self._create_default_idp(body=body)
  933         self.assertValidResponse(resp, 'identity_provider', dummy_validator,
  934                                  keys_to_check=keys_to_check,
  935                                  ref=body)
  936         attr = self._fetch_attribute_from_response(resp, 'identity_provider')
  937         self.assertIdpDomainCreated(attr['id'], attr['domain_id'])
  938 
  939     def test_create_idp_with_domain_id(self):
  940         keys_to_check = list(self.idp_keys)
  941         keys_to_check.append('domain_id')
  942         body = self.default_body.copy()
  943         body['description'] = uuid.uuid4().hex
  944         domain = unit.new_domain_ref()
  945         PROVIDERS.resource_api.create_domain(domain['id'], domain)
  946         body['domain_id'] = domain['id']
  947         resp = self._create_default_idp(body=body)
  948         self.assertValidResponse(resp, 'identity_provider', dummy_validator,
  949                                  keys_to_check=keys_to_check,
  950                                  ref=body)
  951 
  952     def test_create_idp_domain_id_none(self):
  953         keys_to_check = list(self.idp_keys)
  954         body = self.default_body.copy()
  955         body['description'] = uuid.uuid4().hex
  956         body['domain_id'] = None
  957         resp = self._create_default_idp(body=body)
  958         self.assertValidResponse(resp, 'identity_provider', dummy_validator,
  959                                  keys_to_check=keys_to_check,
  960                                  ref=body)
  961         attr = self._fetch_attribute_from_response(resp, 'identity_provider')
  962         self.assertIdpDomainCreated(attr['id'], attr['domain_id'])
  963 
  964     def test_conflicting_idp_cleans_up_auto_generated_domain(self):
  965         # NOTE(lbragstad): Create an identity provider, save its ID, and count
  966         # the number of domains.
  967         resp = self._create_default_idp()
  968         idp_id = resp.json_body['identity_provider']['id']
  969         domains = PROVIDERS.resource_api.list_domains()
  970         number_of_domains = len(domains)
  971 
  972         # Create an identity provider with the same ID to intentionally cause a
  973         # conflict, this is going to result in a domain getting created for the
  974         # new identity provider. The domain for the new identity provider is
  975         # going to be created before the conflict is raised from the database
  976         # layer. This makes sure the domain is cleaned up after a Conflict is
  977         # detected.
  978         resp = self.put(
  979             self.base_url(suffix=idp_id),
  980             body={'identity_provider': self.default_body.copy()},
  981             expected_status=http.client.CONFLICT
  982         )
  983         domains = PROVIDERS.resource_api.list_domains()
  984         self.assertEqual(number_of_domains, len(domains))
  985 
  986     def test_conflicting_idp_does_not_delete_existing_domain(self):
  987         # Create a new domain
  988         domain = unit.new_domain_ref()
  989         PROVIDERS.resource_api.create_domain(domain['id'], domain)
  990 
  991         # Create an identity provider and specify the domain
  992         body = self.default_body.copy()
  993         body['description'] = uuid.uuid4().hex
  994         body['domain_id'] = domain['id']
  995         resp = self._create_default_idp(body=body)
  996         idp = resp.json_body['identity_provider']
  997         idp_id = idp['id']
  998         self.assertEqual(idp['domain_id'], domain['id'])
  999 
 1000         # Create an identity provider with the same domain and ID to ensure a
 1001         # Conflict is raised and then to verify the existing domain not deleted
 1002         # below
 1003         body = self.default_body.copy()
 1004         body['domain_id'] = domain['id']
 1005         resp = self.put(
 1006             self.base_url(suffix=idp_id),
 1007             body={'identity_provider': body},
 1008             expected_status=http.client.CONFLICT
 1009         )
 1010 
 1011         # Make sure the domain specified in the second request was not deleted,
 1012         # since it wasn't auto-generated
 1013         self.assertIsNotNone(PROVIDERS.resource_api.get_domain(domain['id']))
 1014 
 1015     def test_create_multi_idp_to_one_domain(self):
 1016         # create domain and add domain_id to keys to check
 1017         domain = unit.new_domain_ref()
 1018         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1019         keys_to_check = list(self.idp_keys)
 1020         keys_to_check.append('domain_id')
 1021         # create idp with the domain_id
 1022         body = self.default_body.copy()
 1023         body['description'] = uuid.uuid4().hex
 1024         body['domain_id'] = domain['id']
 1025         idp1 = self._create_default_idp(body=body)
 1026         self.assertValidResponse(idp1, 'identity_provider', dummy_validator,
 1027                                  keys_to_check=keys_to_check,
 1028                                  ref=body)
 1029         # create a 2nd idp with the same domain_id
 1030         url = self.base_url(suffix=uuid.uuid4().hex)
 1031         body = self.default_body.copy()
 1032         body['description'] = uuid.uuid4().hex
 1033         body['domain_id'] = domain['id']
 1034         idp2 = self.put(url, body={'identity_provider': body},
 1035                         expected_status=http.client.CREATED)
 1036         self.assertValidResponse(idp2, 'identity_provider', dummy_validator,
 1037                                  keys_to_check=keys_to_check,
 1038                                  ref=body)
 1039 
 1040         self.assertEqual(idp1.result['identity_provider']['domain_id'],
 1041                          idp2.result['identity_provider']['domain_id'])
 1042 
 1043     def test_cannot_update_idp_domain(self):
 1044         # create new idp
 1045         body = self.default_body.copy()
 1046         default_resp = self._create_default_idp(body=body)
 1047         default_idp = self._fetch_attribute_from_response(default_resp,
 1048                                                           'identity_provider')
 1049         idp_id = default_idp.get('id')
 1050         self.assertIsNotNone(idp_id)
 1051         # create domain and try to update the idp's domain
 1052         domain = unit.new_domain_ref()
 1053         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1054         body['domain_id'] = domain['id']
 1055         body = {'identity_provider': body}
 1056         url = self.base_url(suffix=idp_id)
 1057         self.patch(url, body=body, expected_status=http.client.BAD_REQUEST)
 1058 
 1059     def test_create_idp_with_nonexistent_domain_id_fails(self):
 1060         body = self.default_body.copy()
 1061         body['description'] = uuid.uuid4().hex
 1062         body['domain_id'] = uuid.uuid4().hex
 1063         self._create_default_idp(body=body,
 1064                                  expected_status=http.client.NOT_FOUND)
 1065 
 1066     def test_create_idp_remote(self):
 1067         """Create the IdentityProvider entity associated to remote_ids."""
 1068         keys_to_check = list(self.idp_keys)
 1069         keys_to_check.append('remote_ids')
 1070         body = self.default_body.copy()
 1071         body['description'] = uuid.uuid4().hex
 1072         body['remote_ids'] = [uuid.uuid4().hex,
 1073                               uuid.uuid4().hex,
 1074                               uuid.uuid4().hex]
 1075         resp = self._create_default_idp(body=body)
 1076         self.assertValidResponse(resp, 'identity_provider', dummy_validator,
 1077                                  keys_to_check=keys_to_check,
 1078                                  ref=body)
 1079         attr = self._fetch_attribute_from_response(resp, 'identity_provider')
 1080         self.assertIdpDomainCreated(attr['id'], attr['domain_id'])
 1081 
 1082     def test_create_idp_remote_repeated(self):
 1083         """Create two IdentityProvider entities with some remote_ids.
 1084 
 1085         A remote_id is the same for both so the second IdP is not
 1086         created because of the uniqueness of the remote_ids
 1087 
 1088         Expect HTTP 409 Conflict code for the latter call.
 1089 
 1090         """
 1091         body = self.default_body.copy()
 1092         repeated_remote_id = uuid.uuid4().hex
 1093         body['remote_ids'] = [uuid.uuid4().hex,
 1094                               uuid.uuid4().hex,
 1095                               uuid.uuid4().hex,
 1096                               repeated_remote_id]
 1097         self._create_default_idp(body=body)
 1098 
 1099         url = self.base_url(suffix=uuid.uuid4().hex)
 1100         body['remote_ids'] = [uuid.uuid4().hex,
 1101                               repeated_remote_id]
 1102         resp = self.put(url, body={'identity_provider': body},
 1103                         expected_status=http.client.CONFLICT)
 1104 
 1105         resp_data = jsonutils.loads(resp.body)
 1106         self.assertIn('Duplicate remote ID',
 1107                       resp_data.get('error', {}).get('message'))
 1108 
 1109     def test_create_idp_remote_empty(self):
 1110         """Create an IdP with empty remote_ids."""
 1111         keys_to_check = list(self.idp_keys)
 1112         keys_to_check.append('remote_ids')
 1113         body = self.default_body.copy()
 1114         body['description'] = uuid.uuid4().hex
 1115         body['remote_ids'] = []
 1116         resp = self._create_default_idp(body=body)
 1117         self.assertValidResponse(resp, 'identity_provider', dummy_validator,
 1118                                  keys_to_check=keys_to_check,
 1119                                  ref=body)
 1120 
 1121     def test_create_idp_remote_none(self):
 1122         """Create an IdP with a None remote_ids."""
 1123         keys_to_check = list(self.idp_keys)
 1124         keys_to_check.append('remote_ids')
 1125         body = self.default_body.copy()
 1126         body['description'] = uuid.uuid4().hex
 1127         body['remote_ids'] = None
 1128         resp = self._create_default_idp(body=body)
 1129         expected = body.copy()
 1130         expected['remote_ids'] = []
 1131         self.assertValidResponse(resp, 'identity_provider', dummy_validator,
 1132                                  keys_to_check=keys_to_check,
 1133                                  ref=expected)
 1134 
 1135     def test_create_idp_authorization_ttl(self):
 1136         keys_to_check = list(self.idp_keys)
 1137         keys_to_check.append('authorization_ttl')
 1138         body = self.default_body.copy()
 1139         body['description'] = uuid.uuid4().hex
 1140         body['authorization_ttl'] = 10080
 1141         resp = self._create_default_idp(body)
 1142         expected = body.copy()
 1143         self.assertValidResponse(resp, 'identity_provider', dummy_validator,
 1144                                  keys_to_check=keys_to_check,
 1145                                  ref=expected)
 1146 
 1147     def test_update_idp_remote_ids(self):
 1148         """Update IdP's remote_ids parameter."""
 1149         body = self.default_body.copy()
 1150         body['remote_ids'] = [uuid.uuid4().hex]
 1151         default_resp = self._create_default_idp(body=body)
 1152         default_idp = self._fetch_attribute_from_response(default_resp,
 1153                                                           'identity_provider')
 1154         idp_id = default_idp.get('id')
 1155         url = self.base_url(suffix=idp_id)
 1156         self.assertIsNotNone(idp_id)
 1157 
 1158         body['remote_ids'] = [uuid.uuid4().hex, uuid.uuid4().hex]
 1159 
 1160         body = {'identity_provider': body}
 1161         resp = self.patch(url, body=body)
 1162         updated_idp = self._fetch_attribute_from_response(resp,
 1163                                                           'identity_provider')
 1164         body = body['identity_provider']
 1165         self.assertEqual(sorted(body['remote_ids']),
 1166                          sorted(updated_idp.get('remote_ids')))
 1167 
 1168         resp = self.get(url)
 1169         returned_idp = self._fetch_attribute_from_response(resp,
 1170                                                            'identity_provider')
 1171         self.assertEqual(sorted(body['remote_ids']),
 1172                          sorted(returned_idp.get('remote_ids')))
 1173 
 1174     def test_update_idp_clean_remote_ids(self):
 1175         """Update IdP's remote_ids parameter with an empty list."""
 1176         body = self.default_body.copy()
 1177         body['remote_ids'] = [uuid.uuid4().hex]
 1178         default_resp = self._create_default_idp(body=body)
 1179         default_idp = self._fetch_attribute_from_response(default_resp,
 1180                                                           'identity_provider')
 1181         idp_id = default_idp.get('id')
 1182         url = self.base_url(suffix=idp_id)
 1183         self.assertIsNotNone(idp_id)
 1184 
 1185         body['remote_ids'] = []
 1186 
 1187         body = {'identity_provider': body}
 1188         resp = self.patch(url, body=body)
 1189         updated_idp = self._fetch_attribute_from_response(resp,
 1190                                                           'identity_provider')
 1191         body = body['identity_provider']
 1192         self.assertEqual(sorted(body['remote_ids']),
 1193                          sorted(updated_idp.get('remote_ids')))
 1194 
 1195         resp = self.get(url)
 1196         returned_idp = self._fetch_attribute_from_response(resp,
 1197                                                            'identity_provider')
 1198         self.assertEqual(sorted(body['remote_ids']),
 1199                          sorted(returned_idp.get('remote_ids')))
 1200 
 1201     def test_update_idp_remote_repeated(self):
 1202         """Update an IdentityProvider entity reusing a remote_id.
 1203 
 1204         A remote_id is the same for both so the second IdP is not
 1205         updated because of the uniqueness of the remote_ids.
 1206 
 1207         Expect HTTP 409 Conflict code for the latter call.
 1208 
 1209         """
 1210         # Create first identity provider
 1211         body = self.default_body.copy()
 1212         repeated_remote_id = uuid.uuid4().hex
 1213         body['remote_ids'] = [uuid.uuid4().hex, repeated_remote_id]
 1214         self._create_default_idp(body=body)
 1215 
 1216         # Create second identity provider (without remote_ids)
 1217         body = self.default_body.copy()
 1218         default_resp = self._create_default_idp(body=body)
 1219         default_idp = self._fetch_attribute_from_response(default_resp,
 1220                                                           'identity_provider')
 1221         idp_id = default_idp.get('id')
 1222         url = self.base_url(suffix=idp_id)
 1223 
 1224         body['remote_ids'] = [repeated_remote_id]
 1225         resp = self.patch(url, body={'identity_provider': body},
 1226                           expected_status=http.client.CONFLICT)
 1227         resp_data = jsonutils.loads(resp.body)
 1228         self.assertIn('Duplicate remote ID',
 1229                       resp_data['error']['message'])
 1230 
 1231     def test_update_idp_authorization_ttl(self):
 1232         body = self.default_body.copy()
 1233         body['authorization_ttl'] = 10080
 1234         default_resp = self._create_default_idp(body=body)
 1235         default_idp = self._fetch_attribute_from_response(default_resp,
 1236                                                           'identity_provider')
 1237         idp_id = default_idp.get('id')
 1238         url = self.base_url(suffix=idp_id)
 1239         self.assertIsNotNone(idp_id)
 1240 
 1241         body['authorization_ttl'] = None
 1242 
 1243         body = {'identity_provider': body}
 1244         resp = self.patch(url, body=body)
 1245         updated_idp = self._fetch_attribute_from_response(resp,
 1246                                                           'identity_provider')
 1247         body = body['identity_provider']
 1248         self.assertEqual(body['authorization_ttl'],
 1249                          updated_idp.get('authorization_ttl'))
 1250 
 1251         resp = self.get(url)
 1252         returned_idp = self._fetch_attribute_from_response(resp,
 1253                                                            'identity_provider')
 1254         self.assertEqual(body['authorization_ttl'],
 1255                          returned_idp.get('authorization_ttl'))
 1256 
 1257     def test_list_head_idps(self, iterations=5):
 1258         """List all available IdentityProviders.
 1259 
 1260         This test collects ids of created IdPs and
 1261         intersects it with the list of all available IdPs.
 1262         List of all IdPs can be a superset of IdPs created in this test,
 1263         because other tests also create IdPs.
 1264 
 1265         """
 1266         def get_id(resp):
 1267             r = self._fetch_attribute_from_response(resp,
 1268                                                     'identity_provider')
 1269             return r.get('id')
 1270 
 1271         ids = []
 1272         for _ in range(iterations):
 1273             id = get_id(self._create_default_idp())
 1274             ids.append(id)
 1275         ids = set(ids)
 1276 
 1277         keys_to_check = self.idp_keys
 1278         keys_to_check.append('domain_id')
 1279         url = self.base_url()
 1280         resp = self.get(url)
 1281         self.assertValidListResponse(resp, 'identity_providers',
 1282                                      dummy_validator,
 1283                                      keys_to_check=keys_to_check)
 1284         entities = self._fetch_attribute_from_response(resp,
 1285                                                        'identity_providers')
 1286         entities_ids = set([e['id'] for e in entities])
 1287         ids_intersection = entities_ids.intersection(ids)
 1288         self.assertEqual(ids_intersection, ids)
 1289 
 1290         self.head(url, expected_status=http.client.OK)
 1291 
 1292     def test_filter_list_head_idp_by_id(self):
 1293         def get_id(resp):
 1294             r = self._fetch_attribute_from_response(resp,
 1295                                                     'identity_provider')
 1296             return r.get('id')
 1297 
 1298         idp1_id = get_id(self._create_default_idp())
 1299         idp2_id = get_id(self._create_default_idp())
 1300 
 1301         # list the IdP, should get two IdP.
 1302         url = self.base_url()
 1303         resp = self.get(url)
 1304         entities = self._fetch_attribute_from_response(resp,
 1305                                                        'identity_providers')
 1306         entities_ids = [e['id'] for e in entities]
 1307         self.assertItemsEqual(entities_ids, [idp1_id, idp2_id])
 1308 
 1309         # filter the IdP by ID.
 1310         url = self.base_url() + '?id=' + idp1_id
 1311         resp = self.get(url)
 1312         filtered_service_list = resp.json['identity_providers']
 1313         self.assertThat(filtered_service_list, matchers.HasLength(1))
 1314         self.assertEqual(idp1_id, filtered_service_list[0].get('id'))
 1315 
 1316         self.head(url, expected_status=http.client.OK)
 1317 
 1318     def test_filter_list_head_idp_by_enabled(self):
 1319         def get_id(resp):
 1320             r = self._fetch_attribute_from_response(resp,
 1321                                                     'identity_provider')
 1322             return r.get('id')
 1323 
 1324         idp1_id = get_id(self._create_default_idp())
 1325 
 1326         body = self.default_body.copy()
 1327         body['enabled'] = False
 1328         idp2_id = get_id(self._create_default_idp(body=body))
 1329 
 1330         # list the IdP, should get two IdP.
 1331         url = self.base_url()
 1332         resp = self.get(url)
 1333         entities = self._fetch_attribute_from_response(resp,
 1334                                                        'identity_providers')
 1335         entities_ids = [e['id'] for e in entities]
 1336         self.assertItemsEqual(entities_ids, [idp1_id, idp2_id])
 1337 
 1338         # filter the IdP by 'enabled'.
 1339         url = self.base_url() + '?enabled=True'
 1340         resp = self.get(url)
 1341         filtered_service_list = resp.json['identity_providers']
 1342         self.assertThat(filtered_service_list, matchers.HasLength(1))
 1343         self.assertEqual(idp1_id, filtered_service_list[0].get('id'))
 1344 
 1345         self.head(url, expected_status=http.client.OK)
 1346 
 1347     def test_check_idp_uniqueness(self):
 1348         """Add same IdP twice.
 1349 
 1350         Expect HTTP 409 Conflict code for the latter call.
 1351 
 1352         """
 1353         url = self.base_url(suffix=uuid.uuid4().hex)
 1354         body = self._http_idp_input()
 1355         domain = unit.new_domain_ref()
 1356         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1357         body['domain_id'] = domain['id']
 1358         self.put(url, body={'identity_provider': body},
 1359                  expected_status=http.client.CREATED)
 1360         resp = self.put(url, body={'identity_provider': body},
 1361                         expected_status=http.client.CONFLICT)
 1362 
 1363         resp_data = jsonutils.loads(resp.body)
 1364         self.assertIn('Duplicate entry',
 1365                       resp_data.get('error', {}).get('message'))
 1366 
 1367     def test_get_head_idp(self):
 1368         """Create and later fetch IdP."""
 1369         body = self._http_idp_input()
 1370         domain = unit.new_domain_ref()
 1371         PROVIDERS.resource_api.create_domain(domain['id'], domain)
 1372         body['domain_id'] = domain['id']
 1373         default_resp = self._create_default_idp(body=body)
 1374         default_idp = self._fetch_attribute_from_response(default_resp,
 1375                                                           'identity_provider')
 1376         idp_id = default_idp.get('id')
 1377         url = self.base_url(suffix=idp_id)
 1378         resp = self.get(url)
 1379         # Strip keys out of `body` dictionary. This is done
 1380         # to be python 3 compatible
 1381         body_keys = list(body)
 1382         self.assertValidResponse(resp, 'identity_provider',
 1383                                  dummy_validator, keys_to_check=body_keys,
 1384                                  ref=body)
 1385 
 1386         self.head(url, expected_status=http.client.OK)
 1387 
 1388     def test_get_nonexisting_idp(self):
 1389         """Fetch nonexisting IdP entity.
 1390 
 1391         Expected HTTP 404 Not Found status code.
 1392 
 1393         """
 1394         idp_id = uuid.uuid4().hex
 1395         self.assertIsNotNone(idp_id)
 1396 
 1397         url = self.base_url(suffix=idp_id)
 1398         self.get(url, expected_status=http.client.NOT_FOUND)
 1399 
 1400     def test_delete_existing_idp(self):
 1401         """Create and later delete IdP.
 1402 
 1403         Expect HTTP 404 Not Found for the GET IdP call.
 1404         """
 1405         default_resp = self._create_default_idp()
 1406         default_idp = self._fetch_attribute_from_response(default_resp,
 1407                                                           'identity_provider')
 1408         idp_id = default_idp.get('id')
 1409         self.assertIsNotNone(idp_id)
 1410         url = self.base_url(suffix=idp_id)
 1411         self.delete(url)
 1412         self.get(url, expected_status=http.client.NOT_FOUND)
 1413 
 1414     def test_delete_idp_also_deletes_assigned_protocols(self):
 1415         """Deleting an IdP will delete its assigned protocol."""
 1416         # create default IdP
 1417         default_resp = self._create_default_idp()
 1418         default_idp = self._fetch_attribute_from_response(default_resp,
 1419                                                           'identity_provider')
 1420         idp_id = default_idp['id']
 1421         protocol_id = uuid.uuid4().hex
 1422 
 1423         url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
 1424         idp_url = self.base_url(suffix=idp_id)
 1425 
 1426         # assign protocol to IdP
 1427         kwargs = {'expected_status': http.client.CREATED}
 1428         resp, idp_id, proto = self._assign_protocol_to_idp(
 1429             url=url,
 1430             idp_id=idp_id,
 1431             proto=protocol_id,
 1432             **kwargs)
 1433 
 1434         # removing IdP will remove the assigned protocol as well
 1435         self.assertEqual(
 1436             1, len(PROVIDERS.federation_api.list_protocols(idp_id))
 1437         )
 1438         self.delete(idp_url)
 1439         self.get(idp_url, expected_status=http.client.NOT_FOUND)
 1440         self.assertEqual(
 1441             0, len(PROVIDERS.federation_api.list_protocols(idp_id))
 1442         )
 1443 
 1444     def test_delete_nonexisting_idp(self):
 1445         """Delete nonexisting IdP.
 1446 
 1447         Expect HTTP 404 Not Found for the GET IdP call.
 1448         """
 1449         idp_id = uuid.uuid4().hex
 1450         url = self.base_url(suffix=idp_id)
 1451         self.delete(url, expected_status=http.client.NOT_FOUND)
 1452 
 1453     def test_update_idp_mutable_attributes(self):
 1454         """Update IdP's mutable parameters."""
 1455         default_resp = self._create_default_idp()
 1456         default_idp = self._fetch_attribute_from_response(default_resp,
 1457                                                           'identity_provider')
 1458         idp_id = default_idp.get('id')
 1459         url = self.base_url(suffix=idp_id)
 1460         self.assertIsNotNone(idp_id)
 1461 
 1462         _enabled = not default_idp.get('enabled')
 1463         body = {'remote_ids': [uuid.uuid4().hex, uuid.uuid4().hex],
 1464                 'description': uuid.uuid4().hex,
 1465                 'enabled': _enabled}
 1466 
 1467         body = {'identity_provider': body}
 1468         resp = self.patch(url, body=body)
 1469         updated_idp = self._fetch_attribute_from_response(resp,
 1470                                                           'identity_provider')
 1471         body = body['identity_provider']
 1472         for key in body.keys():
 1473             if isinstance(body[key], list):
 1474                 self.assertEqual(sorted(body[key]),
 1475                                  sorted(updated_idp.get(key)))
 1476             else:
 1477                 self.assertEqual(body[key], updated_idp.get(key))
 1478 
 1479         resp = self.get(url)
 1480         updated_idp = self._fetch_attribute_from_response(resp,
 1481                                                           'identity_provider')
 1482         for key in body.keys():
 1483             if isinstance(body[key], list):
 1484                 self.assertEqual(sorted(body[key]),
 1485                                  sorted(updated_idp.get(key)))
 1486             else:
 1487                 self.assertEqual(body[key], updated_idp.get(key))
 1488 
 1489     def test_update_idp_immutable_attributes(self):
 1490         """Update IdP's immutable parameters.
 1491 
 1492         Expect HTTP BAD REQUEST.
 1493 
 1494         """
 1495         default_resp = self._create_default_idp()
 1496         default_idp = self._fetch_attribute_from_response(default_resp,
 1497                                                           'identity_provider')
 1498         idp_id = default_idp.get('id')
 1499         self.assertIsNotNone(idp_id)
 1500 
 1501         body = self._http_idp_input()
 1502         body['id'] = uuid.uuid4().hex
 1503         body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex]
 1504 
 1505         url = self.base_url(suffix=idp_id)
 1506         self.patch(url, body={'identity_provider': body},
 1507                    expected_status=http.client.BAD_REQUEST)
 1508 
 1509     def test_update_nonexistent_idp(self):
 1510         """Update nonexistent IdP.
 1511 
 1512         Expect HTTP 404 Not Found code.
 1513 
 1514         """
 1515         idp_id = uuid.uuid4().hex
 1516         url = self.base_url(suffix=idp_id)
 1517         body = self._http_idp_input()
 1518         body['enabled'] = False
 1519         body = {'identity_provider': body}
 1520 
 1521         self.patch(url, body=body, expected_status=http.client.NOT_FOUND)
 1522 
 1523     def test_assign_protocol_to_idp(self):
 1524         """Assign a protocol to existing IdP."""
 1525         self._assign_protocol_to_idp(expected_status=http.client.CREATED)
 1526 
 1527     def test_protocol_composite_pk(self):
 1528         """Test that Keystone can add two entities.
 1529 
 1530         The entities have identical names, however, attached to different
 1531         IdPs.
 1532 
 1533         1. Add IdP and assign it protocol with predefined name
 1534         2. Add another IdP and assign it a protocol with same name.
 1535 
 1536         Expect HTTP 201 code
 1537 
 1538         """
 1539         url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
 1540 
 1541         kwargs = {'expected_status': http.client.CREATED}
 1542         self._assign_protocol_to_idp(proto='saml2',
 1543                                      url=url, **kwargs)
 1544 
 1545         self._assign_protocol_to_idp(proto='saml2',
 1546                                      url=url, **kwargs)
 1547 
 1548     def test_protocol_idp_pk_uniqueness(self):
 1549         """Test whether Keystone checks for unique idp/protocol values.
 1550 
 1551         Add same protocol twice, expect Keystone to reject a latter call and
 1552         return HTTP 409 Conflict code.
 1553 
 1554         """
 1555         url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
 1556 
 1557         kwargs = {'expected_status': http.client.CREATED}
 1558         resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2',
 1559                                                            url=url, **kwargs)
 1560         kwargs = {'expected_status': http.client.CONFLICT}
 1561         self._assign_protocol_to_idp(
 1562             idp_id=idp_id, proto='saml2', validate=False, url=url, **kwargs
 1563         )
 1564 
 1565     def test_assign_protocol_to_nonexistent_idp(self):
 1566         """Assign protocol to IdP that doesn't exist.
 1567 
 1568         Expect HTTP 404 Not Found code.
 1569 
 1570         """
 1571         idp_id = uuid.uuid4().hex
 1572         kwargs = {'expected_status': http.client.NOT_FOUND}
 1573         self._assign_protocol_to_idp(proto='saml2',
 1574                                      idp_id=idp_id,
 1575                                      validate=False,
 1576                                      **kwargs)
 1577 
 1578     def test_crud_protocol_without_protocol_id_in_url(self):
 1579         # NOTE(morgan): This test is redundant but is added to ensure
 1580         # the url routing error in bug 1817313 is explicitly covered.
 1581         # create a protocol, but do not put the ID in the URL
 1582         idp_id, _ = self._create_and_decapsulate_response()
 1583         mapping_id = uuid.uuid4().hex
 1584         self._create_mapping(mapping_id=mapping_id)
 1585         protocol = {
 1586             'id': uuid.uuid4().hex,
 1587             'mapping_id': mapping_id
 1588         }
 1589         with self.test_client() as c:
 1590             token = self.get_scoped_token()
 1591             # DELETE/PATCH/PUT on non-trailing `/` results in
 1592             # METHOD_NOT_ALLOWED
 1593             c.delete('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
 1594                      '/protocols' % {'idp_id': idp_id},
 1595                      headers={'X-Auth-Token': token},
 1596                      expected_status_code=http.client.METHOD_NOT_ALLOWED)
 1597             c.patch('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
 1598                     '/protocols/' % {'idp_id': idp_id},
 1599                     json={'protocol': protocol},
 1600                     headers={'X-Auth-Token': token},
 1601                     expected_status_code=http.client.METHOD_NOT_ALLOWED)
 1602             c.put('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
 1603                   '/protocols' % {'idp_id': idp_id},
 1604                   json={'protocol': protocol},
 1605                   headers={'X-Auth-Token': token},
 1606                   expected_status_code=http.client.METHOD_NOT_ALLOWED)
 1607 
 1608             # DELETE/PATCH/PUT should raise 405 with trailing '/', it is
 1609             # remapped to without the trailing '/' by the normalization
 1610             # middleware.
 1611             c.delete('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
 1612                      '/protocols/' % {'idp_id': idp_id},
 1613                      headers={'X-Auth-Token': token},
 1614                      expected_status_code=http.client.METHOD_NOT_ALLOWED)
 1615             c.patch('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
 1616                     '/protocols/' % {'idp_id': idp_id},
 1617                     json={'protocol': protocol},
 1618                     headers={'X-Auth-Token': token},
 1619                     expected_status_code=http.client.METHOD_NOT_ALLOWED)
 1620             c.put('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
 1621                   '/protocols/' % {'idp_id': idp_id},
 1622                   json={'protocol': protocol},
 1623                   headers={'X-Auth-Token': token},
 1624                   expected_status_code=http.client.METHOD_NOT_ALLOWED)
 1625 
 1626     def test_get_head_protocol(self):
 1627         """Create and later fetch protocol tied to IdP."""
 1628         resp, idp_id, proto = self._assign_protocol_to_idp(
 1629             expected_status=http.client.CREATED)
 1630         proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id']
 1631         url = "%s/protocols/%s" % (idp_id, proto_id)
 1632         url = self.base_url(suffix=url)
 1633 
 1634         resp = self.get(url)
 1635 
 1636         reference = {'id': proto_id}
 1637         # Strip keys out of `body` dictionary. This is done
 1638         # to be python 3 compatible
 1639         reference_keys = list(reference)
 1640         self.assertValidResponse(resp, 'protocol',
 1641                                  dummy_validator,
 1642                                  keys_to_check=reference_keys,
 1643                                  ref=reference)
 1644 
 1645         self.head(url, expected_status=http.client.OK)
 1646 
 1647     def test_list_head_protocols(self):
 1648         """Create set of protocols and later list them.
 1649 
 1650         Compare input and output id sets.
 1651 
 1652         """
 1653         resp, idp_id, proto = self._assign_protocol_to_idp(
 1654             expected_status=http.client.CREATED)
 1655         iterations = random.randint(0, 16)
 1656         protocol_ids = []
 1657         for _ in range(iterations):
 1658             resp, _, proto = self._assign_protocol_to_idp(
 1659                 idp_id=idp_id,
 1660                 expected_status=http.client.CREATED)
 1661             proto_id = self._fetch_attribute_from_response(resp, 'protocol')
 1662             proto_id = proto_id['id']
 1663             protocol_ids.append(proto_id)
 1664 
 1665         url = "%s/protocols" % idp_id
 1666         url = self.base_url(suffix=url)
 1667         resp = self.get(url)
 1668         self.assertValidListResponse(resp, 'protocols',
 1669                                      dummy_validator,
 1670                                      keys_to_check=['id'])
 1671         entities = self._fetch_attribute_from_response(resp, 'protocols')
 1672         entities = set([entity['id'] for entity in entities])
 1673         protocols_intersection = entities.intersection(protocol_ids)
 1674         self.assertEqual(protocols_intersection, set(protocol_ids))
 1675 
 1676         self.head(url, expected_status=http.client.OK)
 1677 
 1678     def test_update_protocols_attribute(self):
 1679         """Update protocol's attribute."""
 1680         resp, idp_id, proto = self._assign_protocol_to_idp(
 1681             expected_status=http.client.CREATED)
 1682         new_mapping_id = uuid.uuid4().hex
 1683         self._create_mapping(mapping_id=new_mapping_id)
 1684 
 1685         url = "%s/protocols/%s" % (idp_id, proto)
 1686         url = self.base_url(suffix=url)
 1687         body = {'mapping_id': new_mapping_id}
 1688         resp = self.patch(url, body={'protocol': body})
 1689         self.assertValidResponse(resp, 'protocol', dummy_validator,
 1690                                  keys_to_check=['id', 'mapping_id'],
 1691                                  ref={'id': proto,
 1692                                       'mapping_id': new_mapping_id}
 1693                                  )
 1694 
 1695     def test_delete_protocol(self):
 1696         """Delete protocol.
 1697 
 1698         Expect HTTP 404 Not Found code for the GET call after the protocol is
 1699         deleted.
 1700 
 1701         """
 1702         url = self.base_url(suffix='%(idp_id)s/'
 1703                                    'protocols/%(protocol_id)s')
 1704         resp, idp_id, proto = self._assign_protocol_to_idp(
 1705             expected_status=http.client.CREATED)
 1706         url = url % {'idp_id': idp_id,
 1707                      'protocol_id': proto}
 1708         self.delete(url)
 1709         self.get(url, expected_status=http.client.NOT_FOUND)
 1710 
 1711 
 1712 class MappingCRUDTests(test_v3.RestfulTestCase):
 1713     """A class for testing CRUD operations for Mappings."""
 1714 
 1715     MAPPING_URL = '/OS-FEDERATION/mappings/'
 1716 
 1717     def assertValidMappingListResponse(self, resp, *args, **kwargs):
 1718         return self.assertValidListResponse(
 1719             resp,
 1720             'mappings',
 1721             self.assertValidMapping,
 1722             keys_to_check=[],
 1723             *args,
 1724             **kwargs)
 1725 
 1726     def assertValidMappingResponse(self, resp, *args, **kwargs):
 1727         return self.assertValidResponse(
 1728             resp,
 1729             'mapping',
 1730             self.assertValidMapping,
 1731             keys_to_check=[],
 1732             *args,
 1733             **kwargs)
 1734 
 1735     def assertValidMapping(self, entity, ref=None):
 1736         self.assertIsNotNone(entity.get('id'))
 1737         self.assertIsNotNone(entity.get('rules'))
 1738         if ref:
 1739             self.assertEqual(entity['rules'], ref['rules'])
 1740         return entity
 1741 
 1742     def _create_default_mapping_entry(self):
 1743         url = self.MAPPING_URL + uuid.uuid4().hex
 1744         resp = self.put(url,
 1745                         body={'mapping': mapping_fixtures.MAPPING_LARGE},
 1746                         expected_status=http.client.CREATED)
 1747         return resp
 1748 
 1749     def _get_id_from_response(self, resp):
 1750         r = resp.result.get('mapping')
 1751         return r.get('id')
 1752 
 1753     def test_mapping_create(self):
 1754         resp = self._create_default_mapping_entry()
 1755         self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE)
 1756 
 1757     def test_mapping_list_head(self):
 1758         url = self.MAPPING_URL
 1759         self._create_default_mapping_entry()
 1760         resp = self.get(url)
 1761         entities = resp.result.get('mappings')
 1762         self.assertIsNotNone(entities)
 1763         self.assertResponseStatus(resp, http.client.OK)
 1764         self.assertValidListLinks(resp.result.get('links'))
 1765         self.assertEqual(1, len(entities))
 1766         self.head(url, expected_status=http.client.OK)
 1767 
 1768     def test_mapping_delete(self):
 1769         url = self.MAPPING_URL + '%(mapping_id)s'
 1770         resp = self._create_default_mapping_entry()
 1771         mapping_id = self._get_id_from_response(resp)
 1772         url = url % {'mapping_id': str(mapping_id)}
 1773         resp = self.delete(url)
 1774         self.assertResponseStatus(resp, http.client.NO_CONTENT)
 1775         self.get(url, expected_status=http.client.NOT_FOUND)
 1776 
 1777     def test_mapping_get_head(self):
 1778         url = self.MAPPING_URL + '%(mapping_id)s'
 1779         resp = self._create_default_mapping_entry()
 1780         mapping_id = self._get_id_from_response(resp)
 1781         url = url % {'mapping_id': mapping_id}
 1782         resp = self.get(url)
 1783         self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE)
 1784         self.head(url, expected_status=http.client.OK)
 1785 
 1786     def test_mapping_update(self):
 1787         url = self.MAPPING_URL + '%(mapping_id)s'
 1788         resp = self._create_default_mapping_entry()
 1789         mapping_id = self._get_id_from_response(resp)
 1790         url = url % {'mapping_id': mapping_id}
 1791         resp = self.patch(url,
 1792                           body={'mapping': mapping_fixtures.MAPPING_SMALL})
 1793         self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL)
 1794         resp = self.get(url)
 1795         self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL)
 1796 
 1797     def test_delete_mapping_dne(self):
 1798         url = self.MAPPING_URL + uuid.uuid4().hex
 1799         self.delete(url, expected_status=http.client.NOT_FOUND)
 1800 
 1801     def test_get_mapping_dne(self):
 1802         url = self.MAPPING_URL + uuid.uuid4().hex
 1803         self.get(url, expected_status=http.client.NOT_FOUND)
 1804 
 1805     def test_create_mapping_bad_requirements(self):
 1806         url = self.MAPPING_URL + uuid.uuid4().hex
 1807         self.put(url, expected_status=http.client.BAD_REQUEST,
 1808                  body={'mapping': mapping_fixtures.MAPPING_BAD_REQ})
 1809 
 1810     def test_create_mapping_no_rules(self):
 1811         url = self.MAPPING_URL + uuid.uuid4().hex
 1812         self.put(url, expected_status=http.client.BAD_REQUEST,
 1813                  body={'mapping': mapping_fixtures.MAPPING_NO_RULES})
 1814 
 1815     def test_create_mapping_no_remote_objects(self):
 1816         url = self.MAPPING_URL + uuid.uuid4().hex
 1817         self.put(url, expected_status=http.client.BAD_REQUEST,
 1818                  body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE})
 1819 
 1820     def test_create_mapping_bad_value(self):
 1821         url = self.MAPPING_URL + uuid.uuid4().hex
 1822         self.put(url, expected_status=http.client.BAD_REQUEST,
 1823                  body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE})
 1824 
 1825     def test_create_mapping_missing_local(self):
 1826         url = self.MAPPING_URL + uuid.uuid4().hex
 1827         self.put(url, expected_status=http.client.BAD_REQUEST,
 1828                  body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL})
 1829 
 1830     def test_create_mapping_missing_type(self):
 1831         url = self.MAPPING_URL + uuid.uuid4().hex
 1832         self.put(url, expected_status=http.client.BAD_REQUEST,
 1833                  body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE})
 1834 
 1835     def test_create_mapping_wrong_type(self):
 1836         url = self.MAPPING_URL + uuid.uuid4().hex
 1837         self.put(url, expected_status=http.client.BAD_REQUEST,
 1838                  body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE})
 1839 
 1840     def test_create_mapping_extra_remote_properties_not_any_of(self):
 1841         url = self.MAPPING_URL + uuid.uuid4().hex
 1842         mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF
 1843         self.put(url, expected_status=http.client.BAD_REQUEST,
 1844                  body={'mapping': mapping})
 1845 
 1846     def test_create_mapping_extra_remote_properties_any_one_of(self):
 1847         url = self.MAPPING_URL + uuid.uuid4().hex
 1848         mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF
 1849         self.put(url, expected_status=http.client.BAD_REQUEST,
 1850                  body={'mapping': mapping})
 1851 
 1852     def test_create_mapping_extra_remote_properties_just_type(self):
 1853         url = self.MAPPING_URL + uuid.uuid4().hex
 1854         mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE
 1855         self.put(url, expected_status=http.client.BAD_REQUEST,
 1856                  body={'mapping': mapping})
 1857 
 1858     def test_create_mapping_empty_map(self):
 1859         url = self.MAPPING_URL + uuid.uuid4().hex
 1860         self.put(url, expected_status=http.client.BAD_REQUEST,
 1861                  body={'mapping': {}})
 1862 
 1863     def test_create_mapping_extra_rules_properties(self):
 1864         url = self.MAPPING_URL + uuid.uuid4().hex
 1865         self.put(url, expected_status=http.client.BAD_REQUEST,
 1866                  body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS})
 1867 
 1868     def test_create_mapping_with_blacklist_and_whitelist(self):
 1869         """Test for adding whitelist and blacklist in the rule.
 1870 
 1871         Server should respond with HTTP 400 Bad Request error upon discovering
 1872         both ``whitelist`` and ``blacklist`` keywords in the same rule.
 1873 
 1874         """
 1875         url = self.MAPPING_URL + uuid.uuid4().hex
 1876         mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST
 1877         self.put(url, expected_status=http.client.BAD_REQUEST,
 1878                  body={'mapping': mapping})
 1879 
 1880     def test_create_mapping_with_local_user_and_local_domain(self):
 1881         url = self.MAPPING_URL + uuid.uuid4().hex
 1882         resp = self.put(
 1883             url,
 1884             body={
 1885                 'mapping': mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN
 1886             },
 1887             expected_status=http.client.CREATED)
 1888         self.assertValidMappingResponse(
 1889             resp, mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN)
 1890 
 1891     def test_create_mapping_with_ephemeral(self):
 1892         url = self.MAPPING_URL + uuid.uuid4().hex
 1893         resp = self.put(
 1894             url,
 1895             body={'mapping': mapping_fixtures.MAPPING_EPHEMERAL_USER},
 1896             expected_status=http.client.CREATED)
 1897         self.assertValidMappingResponse(
 1898             resp, mapping_fixtures.MAPPING_EPHEMERAL_USER)
 1899 
 1900     def test_create_mapping_with_bad_user_type(self):
 1901         url = self.MAPPING_URL + uuid.uuid4().hex
 1902         # get a copy of a known good map
 1903         bad_mapping = copy.deepcopy(mapping_fixtures.MAPPING_EPHEMERAL_USER)
 1904         # now sabotage the user type
 1905         bad_mapping['rules'][0]['local'][0]['user']['type'] = uuid.uuid4().hex
 1906         self.put(url, expected_status=http.client.BAD_REQUEST,
 1907                  body={'mapping': bad_mapping})
 1908 
 1909     def test_create_shadow_mapping_without_roles_fails(self):
 1910         """Validate that mappings with projects contain roles when created."""
 1911         url = self.MAPPING_URL + uuid.uuid4().hex
 1912         self.put(
 1913             url,
 1914             body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_ROLES},
 1915             expected_status=http.client.BAD_REQUEST
 1916         )
 1917 
 1918     def test_update_shadow_mapping_without_roles_fails(self):
 1919         """Validate that mappings with projects contain roles when updated."""
 1920         url = self.MAPPING_URL + uuid.uuid4().hex
 1921         resp = self.put(
 1922             url,
 1923             body={'mapping': mapping_fixtures.MAPPING_PROJECTS},
 1924             expected_status=http.client.CREATED
 1925         )
 1926         self.assertValidMappingResponse(
 1927             resp, mapping_fixtures.MAPPING_PROJECTS
 1928         )
 1929         self.patch(
 1930             url,
 1931             body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_ROLES},
 1932             expected_status=http.client.BAD_REQUEST
 1933         )
 1934 
 1935     def test_create_shadow_mapping_without_name_fails(self):
 1936         """Validate project mappings contain the project name when created."""
 1937         url = self.MAPPING_URL + uuid.uuid4().hex
 1938         self.put(
 1939             url,
 1940             body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_NAME},
 1941             expected_status=http.client.BAD_REQUEST
 1942         )
 1943 
 1944     def test_update_shadow_mapping_without_name_fails(self):
 1945         """Validate project mappings contain the project name when updated."""
 1946         url = self.MAPPING_URL + uuid.uuid4().hex
 1947         resp = self.put(
 1948             url,
 1949             body={'mapping': mapping_fixtures.MAPPING_PROJECTS},
 1950             expected_status=http.client.CREATED
 1951         )
 1952         self.assertValidMappingResponse(
 1953             resp, mapping_fixtures.MAPPING_PROJECTS
 1954         )
 1955         self.patch(
 1956             url,
 1957             body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_NAME},
 1958             expected_status=http.client.BAD_REQUEST
 1959         )
 1960 
 1961 
 1962 class FederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
 1963 
 1964     def auth_plugin_config_override(self):
 1965         methods = ['saml2', 'token']
 1966         super(FederatedTokenTests, self).auth_plugin_config_override(methods)
 1967 
 1968     def setUp(self):
 1969         super(FederatedTokenTests, self).setUp()
 1970         self._notifications = []
 1971 
 1972         def fake_saml_notify(action, user_id, group_ids,
 1973                              identity_provider, protocol, token_id, outcome):
 1974             note = {
 1975                 'action': action,
 1976                 'user_id': user_id,
 1977                 'identity_provider': identity_provider,
 1978                 'protocol': protocol,
 1979                 'send_notification_called': True}
 1980             self._notifications.append(note)
 1981 
 1982         self.useFixture(fixtures.MockPatchObject(
 1983             notifications,
 1984             'send_saml_audit_notification',
 1985             fake_saml_notify))
 1986 
 1987     def _assert_last_notify(self, action, identity_provider, protocol,
 1988                             user_id=None):
 1989         self.assertTrue(self._notifications)
 1990         note = self._notifications[-1]
 1991         if user_id:
 1992             self.assertEqual(note['user_id'], user_id)
 1993         self.assertEqual(note['action'], action)
 1994         self.assertEqual(note['identity_provider'], identity_provider)
 1995         self.assertEqual(note['protocol'], protocol)
 1996         self.assertTrue(note['send_notification_called'])
 1997 
 1998     def load_fixtures(self, fixtures):
 1999         super(FederatedTokenTests, self).load_fixtures(fixtures)
 2000         self.load_federation_sample_data()
 2001 
 2002     def test_issue_unscoped_token_notify(self):
 2003         self._issue_unscoped_token()
 2004         self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL)
 2005 
 2006     def test_issue_unscoped_token(self):
 2007         r = self._issue_unscoped_token()
 2008         token_resp = render_token.render_token_response_from_model(r)['token']
 2009         self.assertValidMappedUser(token_resp)
 2010 
 2011     def test_default_domain_scoped_token(self):
 2012         # Make sure federated users can get tokens scoped to the default
 2013         # domain, which has a non-uuid ID by default (e.g., `default`). We want
 2014         # to make sure the token provider handles string types properly if the
 2015         # ID isn't compressed into byte format during validation. Turn off
 2016         # cache on issue so that we validate the token online right after we
 2017         # get it to make sure the token provider is called.
 2018         self.config_fixture.config(group='token', cache_on_issue=False)
 2019 
 2020         # Grab an unscoped token to get a domain-scoped token with.
 2021         token = self._issue_unscoped_token()
 2022 
 2023         # Give the user a direct role assignment on the default domain, so they
 2024         # can get a federated domain-scoped token.
 2025         PROVIDERS.assignment_api.create_grant(
 2026             self.role_admin['id'], user_id=token.user_id,
 2027             domain_id=CONF.identity.default_domain_id
 2028         )
 2029 
 2030         # Get a token scoped to the default domain with an ID of `default`,
 2031         # which isn't a uuid type, but we should be able to handle it
 2032         # accordingly in the token formatters/providers.
 2033         auth_request = {
 2034             'auth': {
 2035                 'identity': {
 2036                     'methods': [
 2037                         'token'
 2038                     ],
 2039                     'token': {
 2040                         'id': token.id
 2041                     }
 2042                 },
 2043                 'scope': {
 2044                     'domain': {
 2045                         'id': CONF.identity.default_domain_id
 2046                     }
 2047                 }
 2048             }
 2049         }
 2050         r = self.v3_create_token(auth_request)
 2051         domain_scoped_token_id = r.headers.get('X-Subject-Token')
 2052 
 2053         # Validate the token to make sure the token providers handle non-uuid
 2054         # domain IDs properly.
 2055         headers = {'X-Subject-Token': domain_scoped_token_id}
 2056         self.get(
 2057             '/auth/tokens',
 2058             token=domain_scoped_token_id,
 2059             headers=headers
 2060         )
 2061 
 2062     def test_issue_the_same_unscoped_token_with_user_deleted(self):
 2063         r = self._issue_unscoped_token()
 2064         token = render_token.render_token_response_from_model(r)['token']
 2065         user1 = token['user']
 2066         user_id1 = user1.pop('id')
 2067 
 2068         # delete the referenced user, and authenticate again. Keystone should
 2069         # create another new shadow user.
 2070         PROVIDERS.identity_api.delete_user(user_id1)
 2071 
 2072         r = self._issue_unscoped_token()
 2073         token = render_token.render_token_response_from_model(r)['token']
 2074         user2 = token['user']
 2075         user_id2 = user2.pop('id')
 2076 
 2077         # Only the user_id is different. Other properties include
 2078         # identity_provider, protocol, groups and domain are the same.
 2079         self.assertIsNot(user_id2, user_id1)
 2080         self.assertEqual(user1, user2)
 2081 
 2082     def test_issue_unscoped_token_disabled_idp(self):
 2083         """Check if authentication works with disabled identity providers.
 2084 
 2085         Test plan:
 2086         1) Disable default IdP
 2087         2) Try issuing unscoped token for that IdP
 2088         3) Expect server to forbid authentication
 2089 
 2090         """
 2091         enabled_false = {'enabled': False}
 2092         PROVIDERS.federation_api.update_idp(self.IDP, enabled_false)
 2093         self.assertRaises(exception.Forbidden,
 2094                           self._issue_unscoped_token)
 2095 
 2096     def test_issue_unscoped_token_group_names_in_mapping(self):
 2097         r = self._issue_unscoped_token(assertion='ANOTHER_CUSTOMER_ASSERTION')
 2098         ref_groups = set([self.group_customers['id'], self.group_admins['id']])
 2099         token_groups = r.federated_groups
 2100         token_groups = set([group['id'] for group in token_groups])
 2101         self.assertEqual(ref_groups, token_groups)
 2102 
 2103     def test_issue_unscoped_tokens_nonexisting_group(self):
 2104         self._issue_unscoped_token(assertion='ANOTHER_TESTER_ASSERTION')
 2105 
 2106     def test_issue_unscoped_token_with_remote_no_attribute(self):
 2107         self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
 2108                                    environment={
 2109                                        self.REMOTE_ID_ATTR:
 2110                                            self.REMOTE_IDS[0]
 2111                                    })
 2112 
 2113     def test_issue_unscoped_token_with_remote(self):
 2114         self.config_fixture.config(group='federation',
 2115                                    remote_id_attribute=self.REMOTE_ID_ATTR)
 2116         self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
 2117                                    environment={
 2118                                        self.REMOTE_ID_ATTR:
 2119                                            self.REMOTE_IDS[0]
 2120                                    })
 2121 
 2122     def test_issue_unscoped_token_with_saml2_remote(self):
 2123         self.config_fixture.config(group='saml2',
 2124                                    remote_id_attribute=self.REMOTE_ID_ATTR)
 2125         self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
 2126                                    environment={
 2127                                        self.REMOTE_ID_ATTR:
 2128                                            self.REMOTE_IDS[0]
 2129                                    })
 2130 
 2131     def test_issue_unscoped_token_with_remote_different(self):
 2132         self.config_fixture.config(group='federation',
 2133                                    remote_id_attribute=self.REMOTE_ID_ATTR)
 2134         self.assertRaises(exception.Forbidden,
 2135                           self._issue_unscoped_token,
 2136                           idp=self.IDP_WITH_REMOTE,
 2137                           environment={
 2138                               self.REMOTE_ID_ATTR: uuid.uuid4().hex
 2139                           })
 2140 
 2141     def test_issue_unscoped_token_with_remote_default_overwritten(self):
 2142         """Test that protocol remote_id_attribute has higher priority.
 2143 
 2144         Make sure the parameter stored under ``protocol`` section has higher
 2145         priority over parameter from default ``federation`` configuration
 2146         section.
 2147 
 2148         """
 2149         self.config_fixture.config(group='saml2',
 2150                                    remote_id_attribute=self.REMOTE_ID_ATTR)
 2151         self.config_fixture.config(group='federation',
 2152                                    remote_id_attribute=uuid.uuid4().hex)
 2153         self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
 2154                                    environment={
 2155                                        self.REMOTE_ID_ATTR:
 2156                                            self.REMOTE_IDS[0]
 2157                                    })
 2158 
 2159     def test_issue_unscoped_token_with_remote_unavailable(self):
 2160         self.config_fixture.config(group='federation',
 2161                                    remote_id_attribute=self.REMOTE_ID_ATTR)
 2162         self.assertRaises(exception.Unauthorized,
 2163                           self._issue_unscoped_token,
 2164                           idp=self.IDP_WITH_REMOTE,
 2165                           environment={
 2166                               uuid.uuid4().hex: uuid.uuid4().hex
 2167                           })
 2168 
 2169     def test_issue_unscoped_token_with_remote_user_as_empty_string(self):
 2170         # make sure that REMOTE_USER set as the empty string won't interfere
 2171         self._issue_unscoped_token(environment={'REMOTE_USER': ''})
 2172 
 2173     def test_issue_unscoped_token_no_groups(self):
 2174         r = self._issue_unscoped_token(assertion='USER_NO_GROUPS_ASSERTION')
 2175         token_groups = r.federated_groups
 2176         self.assertEqual(0, len(token_groups))
 2177 
 2178     def test_issue_scoped_token_no_groups(self):
 2179         """Verify that token without groups cannot get scoped to project.
 2180 
 2181         This test is required because of bug 1677723.
 2182         """
 2183         # issue unscoped token with no groups
 2184         r = self._issue_unscoped_token(assertion='USER_NO_GROUPS_ASSERTION')
 2185         token_groups = r.federated_groups
 2186         self.assertEqual(0, len(token_groups))
 2187         unscoped_token = r.id
 2188 
 2189         # let admin get roles in a project
 2190         self.proj_employees
 2191         admin = unit.new_user_ref(CONF.identity.default_domain_id)
 2192         PROVIDERS.identity_api.create_user(admin)
 2193         PROVIDERS.assignment_api.create_grant(
 2194             self.role_admin['id'], user_id=admin['id'],
 2195             project_id=self.proj_employees['id']
 2196         )
 2197 
 2198         # try to scope the token. It should fail
 2199         scope = self._scope_request(
 2200             unscoped_token, 'project', self.proj_employees['id']
 2201         )
 2202         self.v3_create_token(
 2203             scope, expected_status=http.client.UNAUTHORIZED)
 2204 
 2205     def test_issue_unscoped_token_malformed_environment(self):
 2206         """Test whether non string objects are filtered out.
 2207 
 2208         Put non string objects into the environment, inject
 2209         correct assertion and try to get an unscoped token.
 2210         Expect server not to fail on using split() method on
 2211         non string objects and return token id in the HTTP header.
 2212 
 2213         """
 2214         environ = {
 2215             'malformed_object': object(),
 2216             'another_bad_idea': tuple(range(10)),
 2217             'yet_another_bad_param': dict(zip(uuid.uuid4().hex, range(32)))
 2218         }
 2219         environ.update(mapping_fixtures.EMPLOYEE_ASSERTION)
 2220         with self.make_request(environ=environ):
 2221             authentication.authenticate_for_token(self.UNSCOPED_V3_SAML2_REQ)
 2222 
 2223     def test_scope_to_project_once_notify(self):
 2224         r = self.v3_create_token(
 2225             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
 2226         user_id = r.json['token']['user']['id']
 2227         self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL, user_id)
 2228 
 2229     def test_scope_to_project_once(self):
 2230         r = self.v3_create_token(
 2231             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
 2232         token_resp = r.result['token']
 2233         project_id = token_resp['project']['id']
 2234         self._check_project_scoped_token_attributes(token_resp, project_id)
 2235         roles_ref = [self.role_employee]
 2236 
 2237         projects_ref = self.proj_employees
 2238         self._check_projects_and_roles(token_resp, roles_ref, projects_ref)
 2239         self.assertValidMappedUser(token_resp)
 2240 
 2241     def test_scope_token_with_idp_disabled(self):
 2242         """Scope token issued by disabled IdP.
 2243 
 2244         Try scoping the token issued by an IdP which is disabled now. Expect
 2245         server to refuse scoping operation.
 2246 
 2247         This test confirms correct behaviour when IdP was enabled and unscoped
 2248         token was issued, but disabled before user tries to scope the token.
 2249         Here we assume the unscoped token was already issued and start from
 2250         the moment where IdP is being disabled and unscoped token is being
 2251         used.
 2252 
 2253         Test plan:
 2254         1) Disable IdP
 2255         2) Try scoping unscoped token
 2256 
 2257         """
 2258         enabled_false = {'enabled': False}
 2259         PROVIDERS.federation_api.update_idp(self.IDP, enabled_false)
 2260         self.v3_create_token(
 2261             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
 2262             expected_status=http.client.FORBIDDEN)
 2263 
 2264     def test_validate_token_after_deleting_idp_raises_not_found(self):
 2265         token = self.v3_create_token(
 2266             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN
 2267         )
 2268         token_id = token.headers.get('X-Subject-Token')
 2269         federated_info = token.json_body['token']['user']['OS-FEDERATION']
 2270         idp_id = federated_info['identity_provider']['id']
 2271         PROVIDERS.federation_api.delete_idp(idp_id)
 2272         headers = {
 2273             'X-Subject-Token': token_id
 2274         }
 2275         # NOTE(lbragstad): This raises a 404 NOT FOUND because the identity
 2276         # provider is no longer present. We raise 404 NOT FOUND when we
 2277         # validate a token and a project or domain no longer exists.
 2278         self.get(
 2279             '/auth/tokens/',
 2280             token=token_id,
 2281             headers=headers,
 2282             expected_status=http.client.NOT_FOUND
 2283         )
 2284 
 2285     def test_deleting_idp_cascade_deleting_fed_user(self):
 2286         token = self.v3_create_token(
 2287             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN
 2288         )
 2289         federated_info = token.json_body['token']['user']['OS-FEDERATION']
 2290         idp_id = federated_info['identity_provider']['id']
 2291 
 2292         # There are three fed users (from 'EMPLOYEE_ASSERTION',
 2293         # 'CUSTOMER_ASSERTION', 'ADMIN_ASSERTION') with the specified idp.
 2294         hints = driver_hints.Hints()
 2295         hints.add_filter('idp_id', idp_id)
 2296         fed_users = PROVIDERS.shadow_users_api.get_federated_users(hints)
 2297         self.assertEqual(3, len(fed_users))
 2298         idp_domain_id = PROVIDERS.federation_api.get_idp(idp_id)['domain_id']
 2299         for fed_user in fed_users:
 2300             self.assertEqual(idp_domain_id, fed_user['domain_id'])
 2301 
 2302         # Delete the idp
 2303         PROVIDERS.federation_api.delete_idp(idp_id)
 2304 
 2305         # The related federated user should be deleted as well.
 2306         hints = driver_hints.Hints()
 2307         hints.add_filter('idp_id', idp_id)
 2308         fed_users = PROVIDERS.shadow_users_api.get_federated_users(hints)
 2309         self.assertEqual([], fed_users)
 2310 
 2311     def test_scope_to_bad_project(self):
 2312         """Scope unscoped token with a project we don't have access to."""
 2313         self.v3_create_token(
 2314             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
 2315             expected_status=http.client.UNAUTHORIZED)
 2316 
 2317     def test_scope_to_project_multiple_times(self):
 2318         """Try to scope the unscoped token multiple times.
 2319 
 2320         The new tokens should be scoped to:
 2321 
 2322         * Customers' project
 2323         * Employees' project
 2324 
 2325         """
 2326         bodies = (self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN,
 2327                   self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN)
 2328         project_ids = (self.proj_employees['id'],
 2329                        self.proj_customers['id'])
 2330         for body, project_id_ref in zip(bodies, project_ids):
 2331             r = self.v3_create_token(body)
 2332             token_resp = r.result['token']
 2333             self._check_project_scoped_token_attributes(token_resp,
 2334                                                         project_id_ref)
 2335 
 2336     def test_scope_to_project_with_duplicate_roles_returns_single_role(self):
 2337         r = self.v3_create_token(self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN)
 2338 
 2339         # Even though the process of obtaining a token shows that there is a
 2340         # role assignment on a project, we should attempt to create a duplicate
 2341         # assignment somewhere. Do this by creating a direct role assignment
 2342         # with each role against the project the token was scoped to.
 2343         user_id = r.json_body['token']['user']['id']
 2344         project_id = r.json_body['token']['project']['id']
 2345         for role in r.json_body['token']['roles']:
 2346             PROVIDERS.assignment_api.create_grant(
 2347                 role_id=role['id'], user_id=user_id, project_id=project_id
 2348             )
 2349 
 2350         # Ensure all roles in the token are unique even though we know there
 2351         # should be duplicate role assignment from the assertions and the
 2352         # direct role assignments we just created.
 2353         r = self.v3_create_token(self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN)
 2354         known_role_ids = []
 2355         for role in r.json_body['token']['roles']:
 2356             self.assertNotIn(role['id'], known_role_ids)
 2357             known_role_ids.append(role['id'])
 2358 
 2359     def test_scope_to_project_with_only_inherited_roles(self):
 2360         """Try to scope token whose only roles are inherited."""
 2361         r = self.v3_create_token(
 2362             self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER)
 2363         token_resp = r.result['token']
 2364         self._check_project_scoped_token_attributes(
 2365             token_resp, self.project_inherited['id'])
 2366         roles_ref = [self.role_customer]
 2367         projects_ref = self.project_inherited
 2368         self._check_projects_and_roles(token_resp, roles_ref, projects_ref)
 2369         self.assertValidMappedUser(token_resp)
 2370 
 2371     def test_scope_token_from_nonexistent_unscoped_token(self):
 2372         """Try to scope token from non-existent unscoped token."""
 2373         self.v3_create_token(
 2374             self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN,
 2375             expected_status=http.client.NOT_FOUND)
 2376 
 2377     def test_issue_token_from_rules_without_user(self):
 2378         environ = copy.deepcopy(mapping_fixtures.BAD_TESTER_ASSERTION)
 2379         with self.make_request(environ=environ):
 2380             self.assertRaises(exception.Unauthorized,
 2381                               authentication.authenticate_for_token,
 2382                               self.UNSCOPED_V3_SAML2_REQ)
 2383 
 2384     def test_issue_token_with_nonexistent_group(self):
 2385         """Inject assertion that matches rule issuing bad group id.
 2386 
 2387         Expect server to find out that some groups are missing in the
 2388         backend and raise exception.MappedGroupNotFound exception.
 2389 
 2390         """
 2391         self.assertRaises(exception.MappedGroupNotFound,
 2392                           self._issue_unscoped_token,
 2393                           assertion='CONTRACTOR_ASSERTION')
 2394 
 2395     def test_scope_to_domain_once(self):
 2396         r = self.v3_create_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER)
 2397         token_resp = r.result['token']
 2398         self._check_domain_scoped_token_attributes(token_resp,
 2399                                                    self.domainA['id'])
 2400 
 2401     def test_scope_to_domain_multiple_tokens(self):
 2402         """Issue multiple tokens scoping to different domains.
 2403 
 2404         The new tokens should be scoped to:
 2405 
 2406         * domainA
 2407         * domainB
 2408         * domainC
 2409 
 2410         """
 2411         bodies = (self.TOKEN_SCOPE_DOMAIN_A_FROM_ADMIN,
 2412                   self.TOKEN_SCOPE_DOMAIN_B_FROM_ADMIN,
 2413                   self.TOKEN_SCOPE_DOMAIN_C_FROM_ADMIN)
 2414         domain_ids = (self.domainA['id'],
 2415                       self.domainB['id'],
 2416                       self.domainC['id'])
 2417 
 2418         for body, domain_id_ref in zip(bodies, domain_ids):
 2419             r = self.v3_create_token(body)
 2420             token_resp = r.result['token']
 2421             self._check_domain_scoped_token_attributes(token_resp,
 2422                                                        domain_id_ref)
 2423 
 2424     def test_scope_to_domain_with_only_inherited_roles_fails(self):
 2425         """Try to scope to a domain that has no direct roles."""
 2426         self.v3_create_token(
 2427             self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER,
 2428             expected_status=http.client.UNAUTHORIZED)
 2429 
 2430     def test_list_projects(self):
 2431         urls = ('/OS-FEDERATION/projects', '/auth/projects')
 2432 
 2433         token = (self.tokens['CUSTOMER_ASSERTION'],
 2434                  self.tokens['EMPLOYEE_ASSERTION'],
 2435                  self.tokens['ADMIN_ASSERTION'])
 2436 
 2437         projects_refs = (set([self.proj_customers['id'],
 2438                               self.project_inherited['id']]),
 2439                          set([self.proj_employees['id'],
 2440                               self.project_all['id']]),
 2441                          set([self.proj_employees['id'],
 2442                               self.project_all['id'],
 2443                               self.proj_customers['id'],
 2444                               self.project_inherited['id']]))
 2445 
 2446         for token, projects_ref in zip(token, projects_refs):
 2447             for url in urls:
 2448                 r = self.get(url, token=token)
 2449                 projects_resp = r.result['projects']
 2450                 projects = set(p['id'] for p in projects_resp)
 2451                 self.assertEqual(projects_ref, projects,
 2452                                  'match failed for url %s' % url)
 2453 
 2454     # TODO(samueldmq): Create another test class for role inheritance tests.
 2455     # The advantage would be to reduce the complexity of this test class and
 2456     # have tests specific to this functionality grouped, easing readability and
 2457     # maintenability.
 2458     def test_list_projects_for_inherited_project_assignment(self):
 2459         # Create a subproject
 2460         subproject_inherited = unit.new_project_ref(
 2461             domain_id=self.domainD['id'],
 2462             parent_id=self.project_inherited['id'])
 2463         PROVIDERS.resource_api.create_project(
 2464             subproject_inherited['id'], subproject_inherited
 2465         )
 2466 
 2467         # Create an inherited role assignment
 2468         PROVIDERS.assignment_api.create_grant(
 2469             role_id=self.role_employee['id'],
 2470             group_id=self.group_employees['id'],
 2471             project_id=self.project_inherited['id'],
 2472             inherited_to_projects=True)
 2473 
 2474         # Define expected projects from employee assertion, which contain
 2475         # the created subproject
 2476         expected_project_ids = [self.project_all['id'],
 2477                                 self.proj_employees['id'],
 2478                                 subproject_inherited['id']]
 2479 
 2480         # Assert expected projects for both available URLs
 2481         for url in ('/OS-FEDERATION/projects', '/auth/projects'):
 2482             r = self.get(url, token=self.tokens['EMPLOYEE_ASSERTION'])
 2483             project_ids = [project['id'] for project in r.result['projects']]
 2484 
 2485             self.assertEqual(len(expected_project_ids), len(project_ids))
 2486             for expected_project_id in expected_project_ids:
 2487                 self.assertIn(expected_project_id, project_ids,
 2488                               'Projects match failed for url %s' % url)
 2489 
 2490     def test_list_domains(self):
 2491         urls = ('/OS-FEDERATION/domains', '/auth/domains')
 2492 
 2493         tokens = (self.tokens['CUSTOMER_ASSERTION'],
 2494                   self.tokens['EMPLOYEE_ASSERTION'],
 2495                   self.tokens['ADMIN_ASSERTION'])
 2496 
 2497         # NOTE(henry-nash): domain D does not appear in the expected results
 2498         # since it only had inherited roles (which only apply to projects
 2499         # within the domain)
 2500 
 2501         domain_refs = (set([self.domainA['id']]),
 2502                        set([self.domainA['id'],
 2503                             self.domainB['id']]),
 2504                        set([self.domainA['id'],
 2505                             self.domainB['id'],
 2506                             self.domainC['id']]))
 2507 
 2508         for token, domains_ref in zip(tokens, domain_refs):
 2509             for url in urls:
 2510                 r = self.get(url, token=token)
 2511                 domains_resp = r.result['domains']
 2512                 domains = set(p['id'] for p in domains_resp)
 2513                 self.assertEqual(domains_ref, domains,
 2514                                  'match failed for url %s' % url)
 2515 
 2516     def test_full_workflow(self):
 2517         """Test 'standard' workflow for granting access tokens.
 2518 
 2519         * Issue unscoped token
 2520         * List available projects based on groups
 2521         * Scope token to one of available projects
 2522 
 2523         """
 2524         r = self._issue_unscoped_token()
 2525         token_resp = render_token.render_token_response_from_model(r)['token']
 2526         # NOTE(lbragstad): Ensure only 'saml2' is in the method list.
 2527         self.assertListEqual(['saml2'], r.methods)
 2528         self.assertValidMappedUser(token_resp)
 2529         employee_unscoped_token_id = r.id
 2530         r = self.get('/auth/projects', token=employee_unscoped_token_id)
 2531         projects = r.result['projects']
 2532         random_project = random.randint(0, len(projects) - 1)
 2533         project = projects[random_project]
 2534 
 2535         v3_scope_request = self._scope_request(employee_unscoped_token_id,
 2536                                                'project', project['id'])
 2537 
 2538         r = self.v3_create_token(v3_scope_request)
 2539         token_resp = r.result['token']
 2540         self.assertIn('token', token_resp['methods'])
 2541         self.assertIn('saml2', token_resp['methods'])
 2542         self._check_project_scoped_token_attributes(token_resp, project['id'])
 2543 
 2544     def test_workflow_with_groups_deletion(self):
 2545         """Test full workflow with groups deletion before token scoping.
 2546 
 2547         The test scenario is as follows:
 2548          - Create group ``group``
 2549          - Create and assign roles to ``group`` and ``project_all``
 2550          - Patch mapping rules for existing IdP so it issues group id
 2551          - Issue unscoped token with ``group``'s id
 2552          - Delete group ``group``
 2553          - Scope token to ``project_all``
 2554          - Expect HTTP 500 response
 2555 
 2556         """
 2557         # create group and role
 2558         group = unit.new_group_ref(domain_id=self.domainA['id'])
 2559         group = PROVIDERS.identity_api.create_group(group)
 2560         role = unit.new_role_ref()
 2561         PROVIDERS.role_api.create_role(role['id'], role)
 2562 
 2563         # assign role to group and project_admins
 2564         PROVIDERS.assignment_api.create_grant(
 2565             role['id'], group_id=group['id'], project_id=self.project_all['id']
 2566         )
 2567 
 2568         rules = {
 2569             'rules': [
 2570                 {
 2571                     'local': [
 2572                         {
 2573                             'group': {
 2574                                 'id': group['id']
 2575                             }
 2576                         },
 2577                         {
 2578                             'user': {
 2579                                 'name': '{0}'
 2580                             }
 2581                         }
 2582                     ],
 2583                     'remote': [
 2584                         {
 2585                             'type': 'UserName'
 2586                         },
 2587                         {
 2588                             'type': 'LastName',
 2589                             'any_one_of': [
 2590                                 'Account'
 2591                             ]
 2592                         }
 2593                     ]
 2594                 }
 2595             ]
 2596         }
 2597 
 2598         PROVIDERS.federation_api.update_mapping(self.mapping['id'], rules)
 2599 
 2600         r = self._issue_unscoped_token(assertion='TESTER_ASSERTION')
 2601 
 2602         # delete group
 2603         PROVIDERS.identity_api.delete_group(group['id'])
 2604 
 2605         # scope token to project_all, expect HTTP 500
 2606         scoped_token = self._scope_request(
 2607             r.id, 'project',
 2608             self.project_all['id'])
 2609 
 2610         self.v3_create_token(
 2611             scoped_token, expected_status=http.client.INTERNAL_SERVER_ERROR)
 2612 
 2613     def test_lists_with_missing_group_in_backend(self):
 2614         """Test a mapping that points to a group that does not exist.
 2615 
 2616         For explicit mappings, we expect the group to exist in the backend,
 2617         but for lists, specifically blacklists, a missing group is expected
 2618         as many groups will be specified by the IdP that are not Keystone
 2619         groups.
 2620 
 2621         The test scenario is as follows:
 2622          - Create group ``EXISTS``
 2623          - Set mapping rules for existing IdP with a blacklist
 2624            that passes through as REMOTE_USER_GROUPS
 2625          - Issue unscoped token with on group  ``EXISTS`` id in it
 2626 
 2627         """
 2628         domain_id = self.domainA['id']
 2629         domain_name = self.domainA['name']
 2630         group = unit.new_group_ref(domain_id=domain_id, name='EXISTS')
 2631         group = PROVIDERS.identity_api.create_group(group)
 2632         rules = {
 2633             'rules': [
 2634                 {
 2635                     "local": [
 2636                         {
 2637                             "user": {
 2638                                 "name": "{0}",
 2639                                 "id": "{0}"
 2640                             }
 2641                         }
 2642                     ],
 2643                     "remote": [
 2644                         {
 2645                             "type": "REMOTE_USER"
 2646                         }
 2647                     ]
 2648                 },
 2649                 {
 2650                     "local": [
 2651                         {
 2652                             "groups": "{0}",
 2653                             "domain": {"name": domain_name}
 2654                         }
 2655                     ],
 2656                     "remote": [
 2657                         {
 2658                             "type": "REMOTE_USER_GROUPS",
 2659                         }
 2660                     ]
 2661                 }
 2662             ]
 2663         }
 2664         PROVIDERS.federation_api.update_mapping(self.mapping['id'], rules)
 2665         r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION')
 2666         assigned_group_ids = r.federated_groups
 2667         self.assertEqual(1, len(assigned_group_ids))
 2668         self.assertEqual(group['id'], assigned_group_ids[0]['id'])
 2669 
 2670     def test_empty_blacklist_passess_all_values(self):
 2671         """Test a mapping with empty blacklist specified.
 2672 
 2673         Not adding a ``blacklist`` keyword to the mapping rules has the same
 2674         effect as adding an empty ``blacklist``.
 2675         In both cases, the mapping engine will not discard any groups that are
 2676         associated with apache environment variables.
 2677 
 2678         This test checks scenario where an empty blacklist was specified.
 2679         Expected result is to allow any value.
 2680 
 2681         The test scenario is as follows:
 2682          - Create group ``EXISTS``
 2683          - Create group ``NO_EXISTS``
 2684          - Set mapping rules for existing IdP with a blacklist
 2685            that passes through as REMOTE_USER_GROUPS
 2686          - Issue unscoped token with groups  ``EXISTS`` and ``NO_EXISTS``
 2687            assigned
 2688 
 2689         """
 2690         domain_id = self.domainA['id']
 2691         domain_name = self.domainA['name']
 2692 
 2693         # Add a group "EXISTS"
 2694         group_exists = unit.new_group_ref(domain_id=domain_id, name='EXISTS')
 2695         group_exists = PROVIDERS.identity_api.create_group(group_exists)
 2696 
 2697         # Add a group "NO_EXISTS"
 2698         group_no_exists = unit.new_group_ref(domain_id=domain_id,
 2699                                              name='NO_EXISTS')
 2700         group_no_exists = PROVIDERS.identity_api.create_group(group_no_exists)
 2701 
 2702         group_ids = set([group_exists['id'], group_no_exists['id']])
 2703 
 2704         rules = {
 2705             'rules': [
 2706                 {
 2707                     "local": [
 2708                         {
 2709                             "user": {
 2710                                 "name": "{0}",
 2711                                 "id": "{0}"
 2712                             }
 2713                         }
 2714                     ],
 2715                     "remote": [
 2716                         {
 2717                             "type": "REMOTE_USER"
 2718                         }
 2719                     ]
 2720                 },
 2721                 {
 2722                     "local": [
 2723                         {
 2724                             "groups": "{0}",
 2725                             "domain": {"name": domain_name}
 2726                         }
 2727                     ],
 2728                     "remote": [
 2729                         {
 2730                             "type": "REMOTE_USER_GROUPS",
 2731                             "blacklist": []
 2732                         }
 2733                     ]
 2734                 }
 2735             ]
 2736         }
 2737         PROVIDERS.federation_api.update_mapping(self.mapping['id'], rules)
 2738         r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION')
 2739         assigned_group_ids = r.federated_groups
 2740         self.assertEqual(len(group_ids), len(assigned_group_ids))
 2741         for group in assigned_group_ids:
 2742             self.assertIn(group['id'], group_ids)
 2743 
 2744     def test_not_adding_blacklist_passess_all_values(self):
 2745         """Test a mapping without blacklist specified.
 2746 
 2747         Not adding a ``blacklist`` keyword to the mapping rules has the same
 2748         effect as adding an empty ``blacklist``. In both cases all values will
 2749         be accepted and passed.
 2750 
 2751         This test checks scenario where an blacklist was not specified.
 2752         Expected result is to allow any value.
 2753 
 2754         The test scenario is as follows:
 2755          - Create group ``EXISTS``
 2756          - Create group ``NO_EXISTS``
 2757          - Set mapping rules for existing IdP with a blacklist
 2758            that passes through as REMOTE_USER_GROUPS
 2759          - Issue unscoped token with on groups ``EXISTS`` and ``NO_EXISTS``
 2760            assigned
 2761 
 2762         """
 2763         domain_id = self.domainA['id']
 2764         domain_name = self.domainA['name']
 2765 
 2766         # Add a group "EXISTS"
 2767         group_exists = unit.new_group_ref(domain_id=domain_id,
 2768                                           name='EXISTS')
 2769         group_exists = PROVIDERS.identity_api.create_group(group_exists)
 2770 
 2771         # Add a group "NO_EXISTS"
 2772         group_no_exists = unit.new_group_ref(domain_id=domain_id,
 2773                                              name='NO_EXISTS')
 2774         group_no_exists = PROVIDERS.identity_api.create_group(group_no_exists)
 2775 
 2776         group_ids = set([group_exists['id'], group_no_exists['id']])
 2777 
 2778         rules = {
 2779             'rules': [
 2780                 {
 2781                     "local": [
 2782                         {
 2783                             "user": {
 2784                                 "name": "{0}",
 2785                                 "id": "{0}"
 2786                             }
 2787                         }
 2788                     ],
 2789                     "remote": [
 2790                         {
 2791                             "type": "REMOTE_USER"
 2792                         }
 2793                     ]
 2794                 },
 2795                 {
 2796                     "local": [
 2797                         {
 2798                             "groups": "{0}",
 2799                             "domain": {"name": domain_name}
 2800                         }
 2801                     ],
 2802                     "remote": [
 2803                         {
 2804                             "type": "REMOTE_USER_GROUPS",
 2805                         }
 2806                     ]
 2807                 }
 2808             ]
 2809         }
 2810         PROVIDERS.federation_api.update_mapping(self.mapping['id'], rules)
 2811         r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION')
 2812         assigned_group_ids = r.federated_groups
 2813         self.assertEqual(len(group_ids), len(assigned_group_ids))
 2814         for group in assigned_group_ids:
 2815             self.assertIn(group['id'], group_ids)
 2816 
 2817     def test_empty_whitelist_discards_all_values(self):
 2818         """Test that empty whitelist blocks all the values.
 2819 
 2820         Not adding a ``whitelist`` keyword to the mapping value is different
 2821         than adding empty whitelist.  The former case will simply pass all the
 2822         values, whereas the latter would discard all the values.
 2823 
 2824         This test checks scenario where an empty whitelist was specified.
 2825         The expected result is that no groups are matched.
 2826 
 2827         The test scenario is as follows:
 2828          - Create group ``EXISTS``
 2829          - Set mapping rules for existing IdP with an empty whitelist
 2830            that whould discard any values from the assertion
 2831          - Try issuing unscoped token, no groups were matched and that the
 2832            federated user does not have any group assigned.
 2833 
 2834         """
 2835         domain_id = self.domainA['id']
 2836         domain_name = self.domainA['name']
 2837         group = unit.new_group_ref(domain_id=domain_id, name='EXISTS')
 2838         group = PROVIDERS.identity_api.create_group(group)
 2839         rules = {
 2840             'rules': [
 2841                 {
 2842                     "local": [
 2843                         {
 2844                             "user": {
 2845                                 "name": "{0}",
 2846                                 "id": "{0}"
 2847                             }
 2848                         }
 2849                     ],
 2850                     "remote": [
 2851                         {
 2852                             "type": "REMOTE_USER"
 2853                         }
 2854                     ]
 2855                 },
 2856                 {
 2857                     "local": [
 2858                         {
 2859                             "groups": "{0}",
 2860                             "domain": {"name": domain_name}
 2861                         }
 2862                     ],
 2863                     "remote": [
 2864                         {
 2865                             "type": "REMOTE_USER_GROUPS",
 2866                             "whitelist": []
 2867                         }
 2868                     ]
 2869                 }
 2870             ]
 2871         }
 2872         PROVIDERS.federation_api.update_mapping(self.mapping['id'], rules)
 2873         r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION')
 2874         assigned_groups = r.federated_groups
 2875         self.assertEqual(len(assigned_groups), 0)
 2876 
 2877     def test_not_setting_whitelist_accepts_all_values(self):
 2878         """Test that not setting whitelist passes.
 2879 
 2880         Not adding a ``whitelist`` keyword to the mapping value is different
 2881         than adding empty whitelist.  The former case will simply pass all the
 2882         values, whereas the latter would discard all the values.
 2883 
 2884         This test checks a scenario where a ``whitelist`` was not specified.
 2885         Expected result is that no groups are ignored.
 2886 
 2887         The test scenario is as follows:
 2888          - Create group ``EXISTS``
 2889          - Set mapping rules for existing IdP with an empty whitelist
 2890            that whould discard any values from the assertion
 2891          - Issue an unscoped token and make sure ephemeral user is a member of
 2892            two groups.
 2893 
 2894         """
 2895         domain_id = self.domainA['id']
 2896         domain_name = self.domainA['name']
 2897 
 2898         # Add a group "EXISTS"
 2899         group_exists = unit.new_group_ref(domain_id=domain_id,
 2900                                           name='EXISTS')
 2901         group_exists = PROVIDERS.identity_api.create_group(group_exists)
 2902 
 2903         # Add a group "NO_EXISTS"
 2904         group_no_exists = unit.new_group_ref(domain_id=domain_id,
 2905                                              name='NO_EXISTS')
 2906         group_no_exists = PROVIDERS.identity_api.create_group(group_no_exists)
 2907 
 2908         group_ids = set([group_exists['id'], group_no_exists['id']])
 2909 
 2910         rules = {
 2911             'rules': [
 2912                 {
 2913                     "local": [
 2914                         {
 2915                             "user": {
 2916                                 "name": "{0}",
 2917                                 "id": "{0}"
 2918                             }
 2919                         }
 2920                     ],
 2921                     "remote": [
 2922                         {
 2923                             "type": "REMOTE_USER"
 2924                         }
 2925                     ]
 2926                 },
 2927                 {
 2928                     "local": [
 2929                         {
 2930                             "groups": "{0}",
 2931                             "domain": {"name": domain_name}
 2932                         }
 2933                     ],
 2934                     "remote": [
 2935                         {
 2936                             "type": "REMOTE_USER_GROUPS",
 2937                         }
 2938                     ]
 2939                 }
 2940             ]
 2941         }
 2942         PROVIDERS.federation_api.update_mapping(self.mapping['id'], rules)
 2943         r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION')
 2944         assigned_group_ids = r.federated_groups
 2945         self.assertEqual(len(group_ids), len(assigned_group_ids))
 2946         for group in assigned_group_ids:
 2947             self.assertIn(group['id'], group_ids)
 2948 
 2949     def test_assertion_prefix_parameter(self):
 2950         """Test parameters filtering based on the prefix.
 2951 
 2952         With ``assertion_prefix`` set to fixed, non default value,
 2953         issue an unscoped token from assertion EMPLOYEE_ASSERTION_PREFIXED.
 2954         Expect server to return unscoped token.
 2955 
 2956         """
 2957         self.config_fixture.config(group='federation',
 2958                                    assertion_prefix=self.ASSERTION_PREFIX)
 2959         self._issue_unscoped_token(assertion='EMPLOYEE_ASSERTION_PREFIXED')
 2960 
 2961     def test_assertion_prefix_parameter_expect_fail(self):
 2962         """Test parameters filtering based on the prefix.
 2963 
 2964         With ``assertion_prefix`` default value set to empty string
 2965         issue an unscoped token from assertion EMPLOYEE_ASSERTION.
 2966         Next, configure ``assertion_prefix`` to value ``UserName``.
 2967         Try issuing unscoped token with EMPLOYEE_ASSERTION.
 2968         Expect server to raise exception.Unathorized exception.
 2969 
 2970         """
 2971         self._issue_unscoped_token()
 2972         self.config_fixture.config(group='federation',
 2973                                    assertion_prefix='UserName')
 2974 
 2975         self.assertRaises(exception.Unauthorized,
 2976                           self._issue_unscoped_token)
 2977 
 2978     def test_unscoped_token_has_user_domain(self):
 2979         r = self._issue_unscoped_token()
 2980         self._check_domains_are_valid(
 2981             render_token.render_token_response_from_model(r)['token'])
 2982 
 2983     def test_scoped_token_has_user_domain(self):
 2984         r = self.v3_create_token(
 2985             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
 2986         self._check_domains_are_valid(r.json_body['token'])
 2987 
 2988     def test_issue_unscoped_token_for_local_user(self):
 2989         r = self._issue_unscoped_token(assertion='LOCAL_USER_ASSERTION')
 2990         self.assertListEqual(['saml2'], r.methods)
 2991         self.assertEqual(self.user['id'], r.user_id)
 2992         self.assertEqual(self.user['name'], r.user['name'])
 2993         self.assertEqual(self.domain['id'], r.user_domain['id'])
 2994         # Make sure the token is not scoped
 2995         self.assertIsNone(r.domain_id)
 2996         self.assertIsNone(r.project_id)
 2997         self.assertTrue(r.unscoped)
 2998 
 2999     def test_issue_token_for_local_user_user_not_found(self):
 3000         self.assertRaises(exception.Unauthorized,
 3001                           self._issue_unscoped_token,
 3002                           assertion='ANOTHER_LOCAL_USER_ASSERTION')
 3003 
 3004     def test_user_name_and_id_in_federation_token(self):
 3005         r = self._issue_unscoped_token(assertion='EMPLOYEE_ASSERTION')
 3006         self.assertEqual(
 3007             mapping_fixtures.EMPLOYEE_ASSERTION['UserName'],
 3008             r.user['name'])
 3009         self.assertNotEqual(r.user['name'], r.user_id)
 3010         r = self.v3_create_token(
 3011             self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
 3012         token = r.json_body['token']
 3013         self.assertEqual(
 3014             mapping_fixtures.EMPLOYEE_ASSERTION['UserName'],
 3015             token['user']['name'])
 3016         self.assertNotEqual(token['user']['name'], token['user']['id'])
 3017 
 3018     def test_issue_unscoped_token_with_remote_different_from_protocol(self):
 3019         protocol = PROVIDERS.federation_api.get_protocol(
 3020             self.IDP_WITH_REMOTE, self.PROTOCOL
 3021         )
 3022         protocol['remote_id_attribute'] = uuid.uuid4().hex
 3023         PROVIDERS.federation_api.update_protocol(
 3024             self.IDP_WITH_REMOTE, protocol['id'], protocol
 3025         )
 3026         self._issue_unscoped_token(
 3027             idp=self.IDP_WITH_REMOTE,
 3028             environment={
 3029                 protocol['remote_id_attribute']: self.REMOTE_IDS[0]
 3030             }
 3031         )
 3032         self.assertRaises(
 3033             exception.Unauthorized,
 3034             self._issue_unscoped_token,
 3035             idp=self.IDP_WITH_REMOTE,
 3036             environment={uuid.uuid4().hex: self.REMOTE_IDS[0]}
 3037         )
 3038 
 3039 
 3040 class FernetFederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
 3041     AUTH_METHOD = 'token'
 3042 
 3043     def load_fixtures(self, fixtures):
 3044         super(FernetFederatedTokenTests, self).load_fixtures(fixtures)
 3045         self.load_federation_sample_data()
 3046 
 3047     def config_overrides(self):
 3048         super(FernetFederatedTokenTests, self).config_overrides()
 3049         self.config_fixture.config(group='token', provider='fernet')
 3050         self.useFixture(
 3051             ksfixtures.KeyRepository(
 3052                 self.config_fixture,
 3053                 'fernet_tokens',
 3054                 CONF.fernet_tokens.max_active_keys
 3055             )
 3056         )
 3057 
 3058     def auth_plugin_config_override(self):
 3059         methods = ['saml2', 'token', 'password']
 3060         super(FernetFederatedTokenTests,
 3061               self).auth_plugin_config_override(methods)
 3062 
 3063     def test_federated_unscoped_token(self):
 3064         resp = self._issue_unscoped_token()
 3065         self.assertValidMappedUser(
 3066             render_token.render_token_response_from_model(resp)['token'])
 3067 
 3068     def test_federated_unscoped_token_with_multiple_groups(self):
 3069         assertion = 'ANOTHER_CUSTOMER_ASSERTION'
 3070         resp = self._issue_unscoped_token(assertion=assertion)
 3071         self.assertValidMappedUser(
 3072             render_token.render_token_response_from_model(resp)['token'])
 3073 
 3074     def test_validate_federated_unscoped_token(self):
 3075         resp = self._issue_unscoped_token()
 3076         unscoped_token = resp.id
 3077         # assert that the token we received is valid
 3078         self.get('/auth/tokens/', headers={'X-Subject-Token': unscoped_token})
 3079 
 3080     def test_fernet_full_workflow(self):
 3081         """Test 'standard' workflow for granting Fernet access tokens.
 3082 
 3083         * Issue unscoped token
 3084         * List available projects based on groups
 3085         * Scope token to one of available projects
 3086 
 3087         """
 3088         resp = self._issue_unscoped_token()
 3089         self.assertValidMappedUser(
 3090             render_token.render_token_response_from_model(resp)['token'])
 3091         unscoped_token = resp.id
 3092         resp = self.get('/auth/projects', token=unscoped_token)
 3093         projects = resp.result['projects']
 3094         random_project = random.randint(0, len(projects) - 1)
 3095         project = projects[random_project]
 3096 
 3097         v3_scope_request = self._scope_request(unscoped_token,
 3098                                                'project', project['id'])
 3099 
 3100         resp = self.v3_create_token(v3_scope_request)
 3101         token_resp = resp.result['token']
 3102         self._check_project_scoped_token_attributes(token_resp, project['id'])
 3103 
 3104 
 3105 class JWSFederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
 3106     AUTH_METHOD = 'token'
 3107 
 3108     def load_fixtures(self, fixtures):
 3109         super(JWSFederatedTokenTests, self).load_fixtures(fixtures)
 3110         self.load_federation_sample_data()
 3111 
 3112     def config_overrides(self):
 3113         super(JWSFederatedTokenTests, self).config_overrides()
 3114         self.config_fixture.config(group='token', provider='jws')
 3115         self.useFixture(ksfixtures.JWSKeyRepository(self.config_fixture))
 3116 
 3117     def auth_plugin_config_override(self):
 3118         methods = ['saml2', 'token', 'password']
 3119         super(JWSFederatedTokenTests,
 3120               self).auth_plugin_config_override(methods)
 3121 
 3122     def test_federated_unscoped_token(self):
 3123         token_model = self._issue_unscoped_token()
 3124         self.assertValidMappedUser(
 3125             render_token.render_token_response_from_model(token_model)['token']
 3126         )
 3127 
 3128     def test_federated_unscoped_token_with_multiple_groups(self):
 3129         assertion = 'ANOTHER_CUSTOMER_ASSERTION'
 3130         token_model = self._issue_unscoped_token(assertion=assertion)
 3131         self.assertValidMappedUser(
 3132             render_token.render_token_response_from_model(token_model)['token']
 3133         )
 3134 
 3135     def test_validate_federated_unscoped_token(self):
 3136         token_model = self._issue_unscoped_token()
 3137         unscoped_token = token_model.id
 3138         # assert that the token we received is valid
 3139         self.get('/auth/tokens/', headers={'X-Subject-Token': unscoped_token})
 3140 
 3141     def test_jws_full_workflow(self):
 3142         """Test 'standard' workflow for granting JWS tokens.
 3143 
 3144         * Issue unscoped token
 3145         * List available projects based on groups
 3146         * Scope token to one of available projects
 3147 
 3148         """
 3149         token_model = self._issue_unscoped_token()
 3150         self.assertValidMappedUser(
 3151             render_token.render_token_response_from_model(token_model)['token']
 3152         )
 3153         unscoped_token = token_model.id
 3154         resp = self.get('/auth/projects', token=unscoped_token)
 3155         projects = resp.result['projects']
 3156         random_project = random.randint(0, len(projects) - 1)
 3157         project = projects[random_project]
 3158 
 3159         v3_scope_request = self._scope_request(unscoped_token,
 3160                                                'project', project['id'])
 3161 
 3162         resp = self.v3_create_token(v3_scope_request)
 3163         token_resp = resp.result['token']
 3164         self._check_project_scoped_token_attributes(token_resp, project['id'])
 3165 
 3166 
 3167 class FederatedTokenTestsMethodToken(FederatedTokenTests):
 3168     """Test federation operation with unified scoping auth method.
 3169 
 3170     Test all the operations with auth method set to ``token`` as a new, unified
 3171     way for scoping all the tokens.
 3172 
 3173     """
 3174 
 3175     AUTH_METHOD = 'token'
 3176 
 3177     def auth_plugin_config_override(self):
 3178         methods = ['saml2', 'token']
 3179         super(FederatedTokenTests,
 3180               self).auth_plugin_config_override(methods)
 3181 
 3182     def test_full_workflow(self):
 3183         """Test 'standard' workflow for granting access tokens.
 3184 
 3185         * Issue unscoped token
 3186         * List available projects based on groups
 3187         * Scope token to one of available projects
 3188 
 3189         """
 3190         r = self._issue_unscoped_token()
 3191         token_resp = render_token.render_token_response_from_model(r)['token']
 3192         # NOTE(lbragstad): Ensure only 'saml2' is in the method list.
 3193         self.assertListEqual(['saml2'], r.methods)
 3194         self.assertValidMappedUser(token_resp)
 3195         employee_unscoped_token_id = r.id
 3196         r = self.get('/auth/projects', token=employee_unscoped_token_id)
 3197         projects = r.result['projects']
 3198         random_project = random.randint(0, len(projects) - 1)
 3199         project = projects[random_project]
 3200 
 3201         v3_scope_request = self._scope_request(employee_unscoped_token_id,
 3202                                                'project', project['id'])
 3203 
 3204         r = self.v3_create_token(v3_scope_request)
 3205         token_resp = r.result['token']
 3206         self.assertIn('token', token_resp['methods'])
 3207         self.assertIn('saml2', token_resp['methods'])
 3208         self._check_project_scoped_token_attributes(token_resp, project['id'])
 3209 
 3210 
 3211 class FederatedUserTests(test_v3.RestfulTestCase, FederatedSetupMixin):
 3212     """Test for federated users.
 3213 
 3214     Tests new shadow users functionality
 3215 
 3216     """
 3217 
 3218     def auth_plugin_config_override(self):
 3219         methods = ['saml2', 'token']
 3220         super(FederatedUserTests, self).auth_plugin_config_override(methods)
 3221 
 3222     def load_fixtures(self, fixtures):
 3223         super(FederatedUserTests, self).load_fixtures(fixtures)
 3224         self.load_federation_sample_data()
 3225 
 3226     def test_user_id_persistense(self):
 3227         """Ensure user_id is persistend for multiple federated authn calls."""
 3228         r = self._issue_unscoped_token()
 3229         user_id = r.user_id
 3230         self.assertNotEmpty(PROVIDERS.identity_api.get_user(user_id))
 3231 
 3232         r = self._issue_unscoped_token()
 3233         user_id2 = r.user_id
 3234         self.assertNotEmpty(PROVIDERS.identity_api.get_user(user_id2))
 3235         self.assertEqual(user_id, user_id2)
 3236 
 3237     def test_user_role_assignment(self):
 3238         # create project and role
 3239         project_ref = unit.new_project_ref(
 3240             domain_id=CONF.identity.default_domain_id)
 3241         PROVIDERS.resource_api.create_project(project_ref['id'], project_ref)
 3242         role_ref = unit.new_role_ref()
 3243         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 3244 
 3245         # authenticate via saml get back a user id
 3246         user_id, unscoped_token = self._authenticate_via_saml()
 3247 
 3248         # exchange an unscoped token for a scoped token; resulting in
 3249         # unauthorized because the user doesn't have any role assignments
 3250         v3_scope_request = self._scope_request(unscoped_token, 'project',
 3251                                                project_ref['id'])
 3252         r = self.v3_create_token(v3_scope_request,
 3253                                  expected_status=http.client.UNAUTHORIZED)
 3254 
 3255         # assign project role to federated user
 3256         PROVIDERS.assignment_api.add_role_to_user_and_project(
 3257             user_id, project_ref['id'], role_ref['id'])
 3258 
 3259         # exchange an unscoped token for a scoped token
 3260         r = self.v3_create_token(v3_scope_request,
 3261                                  expected_status=http.client.CREATED)
 3262         scoped_token = r.headers['X-Subject-Token']
 3263 
 3264         # ensure user can access resource based on role assignment
 3265         path = '/projects/%(project_id)s' % {'project_id': project_ref['id']}
 3266         r = self.v3_request(path=path, method='GET',
 3267                             expected_status=http.client.OK,
 3268                             token=scoped_token)
 3269         self.assertValidProjectResponse(r, project_ref)
 3270 
 3271         # create a 2nd project
 3272         project_ref2 = unit.new_project_ref(
 3273             domain_id=CONF.identity.default_domain_id)
 3274         PROVIDERS.resource_api.create_project(project_ref2['id'], project_ref2)
 3275 
 3276         # ensure the user cannot access the 2nd resource (forbidden)
 3277         path = '/projects/%(project_id)s' % {'project_id': project_ref2['id']}
 3278         r = self.v3_request(path=path, method='GET',
 3279                             expected_status=http.client.FORBIDDEN,
 3280                             token=scoped_token)
 3281 
 3282     def test_domain_scoped_user_role_assignment(self):
 3283         # create domain and role
 3284         domain_ref = unit.new_domain_ref()
 3285         PROVIDERS.resource_api.create_domain(domain_ref['id'], domain_ref)
 3286         role_ref = unit.new_role_ref()
 3287         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 3288 
 3289         # authenticate via saml get back a user id
 3290         user_id, unscoped_token = self._authenticate_via_saml()
 3291 
 3292         # exchange an unscoped token for a scoped token; resulting in
 3293         # unauthorized because the user doesn't have any role assignments
 3294         v3_scope_request = self._scope_request(unscoped_token, 'domain',
 3295                                                domain_ref['id'])
 3296         r = self.v3_create_token(v3_scope_request,
 3297                                  expected_status=http.client.UNAUTHORIZED)
 3298 
 3299         # assign domain role to user
 3300         PROVIDERS.assignment_api.create_grant(
 3301             role_ref['id'], user_id=user_id, domain_id=domain_ref['id']
 3302         )
 3303 
 3304         # exchange an unscoped token for domain scoped token and test
 3305         r = self.v3_create_token(v3_scope_request,
 3306                                  expected_status=http.client.CREATED)
 3307         self.assertIsNotNone(r.headers.get('X-Subject-Token'))
 3308         token_resp = r.result['token']
 3309         self.assertIn('domain', token_resp)
 3310 
 3311     def test_auth_projects_matches_federation_projects(self):
 3312         # create project and role
 3313         project_ref = unit.new_project_ref(
 3314             domain_id=CONF.identity.default_domain_id)
 3315         PROVIDERS.resource_api.create_project(project_ref['id'], project_ref)
 3316         role_ref = unit.new_role_ref()
 3317         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 3318 
 3319         # authenticate via saml get back a user id
 3320         user_id, unscoped_token = self._authenticate_via_saml()
 3321 
 3322         # assign project role to federated user
 3323         PROVIDERS.assignment_api.add_role_to_user_and_project(
 3324             user_id, project_ref['id'], role_ref['id'])
 3325 
 3326         # get auth projects
 3327         r = self.get('/auth/projects', token=unscoped_token)
 3328         auth_projects = r.result['projects']
 3329 
 3330         # get federation projects
 3331         r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
 3332         fed_projects = r.result['projects']
 3333 
 3334         # compare
 3335         self.assertItemsEqual(auth_projects, fed_projects)
 3336 
 3337     def test_auth_projects_matches_federation_projects_with_group_assign(self):
 3338         # create project, role, group
 3339         domain_id = CONF.identity.default_domain_id
 3340         project_ref = unit.new_project_ref(domain_id=domain_id)
 3341         PROVIDERS.resource_api.create_project(project_ref['id'], project_ref)
 3342         role_ref = unit.new_role_ref()
 3343         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 3344         group_ref = unit.new_group_ref(domain_id=domain_id)
 3345         group_ref = PROVIDERS.identity_api.create_group(group_ref)
 3346 
 3347         # authenticate via saml get back a user id
 3348         user_id, unscoped_token = self._authenticate_via_saml()
 3349 
 3350         # assign role to group at project
 3351         PROVIDERS.assignment_api.create_grant(
 3352             role_ref['id'], group_id=group_ref['id'],
 3353             project_id=project_ref['id'], domain_id=domain_id
 3354         )
 3355 
 3356         # add user to group
 3357         PROVIDERS.identity_api.add_user_to_group(
 3358             user_id=user_id, group_id=group_ref['id']
 3359         )
 3360 
 3361         # get auth projects
 3362         r = self.get('/auth/projects', token=unscoped_token)
 3363         auth_projects = r.result['projects']
 3364 
 3365         # get federation projects
 3366         r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
 3367         fed_projects = r.result['projects']
 3368 
 3369         # compare
 3370         self.assertItemsEqual(auth_projects, fed_projects)
 3371 
 3372     def test_auth_domains_matches_federation_domains(self):
 3373         # create domain and role
 3374         domain_ref = unit.new_domain_ref()
 3375         PROVIDERS.resource_api.create_domain(domain_ref['id'], domain_ref)
 3376         role_ref = unit.new_role_ref()
 3377         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 3378 
 3379         # authenticate via saml get back a user id and token
 3380         user_id, unscoped_token = self._authenticate_via_saml()
 3381 
 3382         # assign domain role to user
 3383         PROVIDERS.assignment_api.create_grant(
 3384             role_ref['id'], user_id=user_id, domain_id=domain_ref['id']
 3385         )
 3386 
 3387         # get auth domains
 3388         r = self.get('/auth/domains', token=unscoped_token)
 3389         auth_domains = r.result['domains']
 3390 
 3391         # get federation domains
 3392         r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
 3393         fed_domains = r.result['domains']
 3394 
 3395         # compare
 3396         self.assertItemsEqual(auth_domains, fed_domains)
 3397 
 3398     def test_auth_domains_matches_federation_domains_with_group_assign(self):
 3399         # create role, group, and domain
 3400         domain_ref = unit.new_domain_ref()
 3401         PROVIDERS.resource_api.create_domain(domain_ref['id'], domain_ref)
 3402         role_ref = unit.new_role_ref()
 3403         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 3404         group_ref = unit.new_group_ref(domain_id=domain_ref['id'])
 3405         group_ref = PROVIDERS.identity_api.create_group(group_ref)
 3406 
 3407         # authenticate via saml get back a user id and token
 3408         user_id, unscoped_token = self._authenticate_via_saml()
 3409 
 3410         # assign domain role to group
 3411         PROVIDERS.assignment_api.create_grant(
 3412             role_ref['id'], group_id=group_ref['id'],
 3413             domain_id=domain_ref['id']
 3414         )
 3415 
 3416         # add user to group
 3417         PROVIDERS.identity_api.add_user_to_group(
 3418             user_id=user_id, group_id=group_ref['id']
 3419         )
 3420 
 3421         # get auth domains
 3422         r = self.get('/auth/domains', token=unscoped_token)
 3423         auth_domains = r.result['domains']
 3424 
 3425         # get federation domains
 3426         r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
 3427         fed_domains = r.result['domains']
 3428 
 3429         # compare
 3430         self.assertItemsEqual(auth_domains, fed_domains)
 3431 
 3432     def test_list_head_domains_for_user_duplicates(self):
 3433         # create role
 3434         role_ref = unit.new_role_ref()
 3435         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 3436 
 3437         # authenticate via saml get back a user id and token
 3438         user_id, unscoped_token = self._authenticate_via_saml()
 3439 
 3440         # get federation group domains
 3441         r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
 3442         group_domains = r.result['domains']
 3443         domain_from_group = group_domains[0]
 3444 
 3445         self.head(
 3446             '/OS-FEDERATION/domains',
 3447             token=unscoped_token,
 3448             expected_status=http.client.OK
 3449         )
 3450 
 3451         # assign group domain and role to user, this should create a
 3452         # duplicate domain
 3453         PROVIDERS.assignment_api.create_grant(
 3454             role_ref['id'], user_id=user_id, domain_id=domain_from_group['id']
 3455         )
 3456 
 3457         # get user domains via /OS-FEDERATION/domains and test for duplicates
 3458         r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
 3459         user_domains = r.result['domains']
 3460         user_domain_ids = []
 3461         for domain in user_domains:
 3462             self.assertNotIn(domain['id'], user_domain_ids)
 3463             user_domain_ids.append(domain['id'])
 3464 
 3465         # get user domains via /auth/domains and test for duplicates
 3466         r = self.get('/auth/domains', token=unscoped_token)
 3467         user_domains = r.result['domains']
 3468         user_domain_ids = []
 3469         for domain in user_domains:
 3470             self.assertNotIn(domain['id'], user_domain_ids)
 3471             user_domain_ids.append(domain['id'])
 3472 
 3473     def test_list_head_projects_for_user_duplicates(self):
 3474         # create role
 3475         role_ref = unit.new_role_ref()
 3476         PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
 3477 
 3478         # authenticate via saml get back a user id and token
 3479         user_id, unscoped_token = self._authenticate_via_saml()
 3480 
 3481         # get federation group projects
 3482         r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
 3483         group_projects = r.result['projects']
 3484         project_from_group = group_projects[0]
 3485 
 3486         self.head(
 3487             '/OS-FEDERATION/projects',
 3488             token=unscoped_token,
 3489             expected_status=http.client.OK
 3490         )
 3491 
 3492         # assign group project and role to user, this should create a
 3493         # duplicate project
 3494         PROVIDERS.assignment_api.add_role_to_user_and_project(
 3495             user_id, project_from_group['id'], role_ref['id'])
 3496 
 3497         # get user projects via /OS-FEDERATION/projects and test for duplicates
 3498         r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
 3499         user_projects = r.result['projects']
 3500         user_project_ids = []
 3501         for project in user_projects:
 3502             self.assertNotIn(project['id'], user_project_ids)
 3503             user_project_ids.append(project['id'])
 3504 
 3505         # get user projects via /auth/projects and test for duplicates
 3506         r = self.get('/auth/projects', token=unscoped_token)
 3507         user_projects = r.result['projects']
 3508         user_project_ids = []
 3509         for project in user_projects:
 3510             self.assertNotIn(project['id'], user_project_ids)
 3511             user_project_ids.append(project['id'])
 3512 
 3513     def test_delete_protocol_after_federated_authentication(self):
 3514         # Create a protocol
 3515         protocol = self.proto_ref(mapping_id=self.mapping['id'])
 3516         PROVIDERS.federation_api.create_protocol(
 3517             self.IDP, protocol['id'], protocol)
 3518 
 3519         # Authenticate to create a new federated_user entry with a foreign
 3520         # key pointing to the protocol
 3521         r = self._issue_unscoped_token()
 3522         user_id = r.user_id
 3523         self.assertNotEmpty(PROVIDERS.identity_api.get_user(user_id))
 3524 
 3525         # Now we should be able to delete the protocol
 3526         PROVIDERS.federation_api.delete_protocol(self.IDP, protocol['id'])
 3527 
 3528     def _authenticate_via_saml(self):
 3529         r = self._issue_unscoped_token()
 3530         unscoped_token = r.id
 3531         token_resp = render_token.render_token_response_from_model(r)['token']
 3532         self.assertValidMappedUser(token_resp)
 3533         return r.user_id, unscoped_token
 3534 
 3535 
 3536 class ShadowMappingTests(test_v3.RestfulTestCase, FederatedSetupMixin):
 3537     """Test class dedicated to auto-provisioning resources at login.
 3538 
 3539     A shadow mapping is a mapping that contains extra properties about that
 3540     specific federated user's situation based on attributes from the assertion.
 3541     For example, a shadow mapping can tell us that a user should have specific
 3542     role assignments on certain projects within a domain. When a federated user
 3543     authenticates, the shadow mapping will create these entities before
 3544     returning the authenticated response to the user. This test class is
 3545     dedicated to testing specific aspects of shadow mapping when performing
 3546     federated authentication.
 3547     """
 3548 
 3549     def setUp(self):
 3550         super(ShadowMappingTests, self).setUp()
 3551         # update the mapping we have already setup to have specific projects
 3552         # and roles.
 3553         PROVIDERS.federation_api.update_mapping(
 3554             self.mapping['id'],
 3555             mapping_fixtures.MAPPING_PROJECTS
 3556         )
 3557 
 3558         # The shadow mapping we're using in these tests contain a role named
 3559         # `member` and `observer` for the sake of using something other than
 3560         # `admin`. We'll need to create those before hand, otherwise the
 3561         # mapping will fail during authentication because the roles defined in
 3562         # the mapping do not exist yet. The shadow mapping mechanism currently
 3563         # doesn't support creating roles on-the-fly, but this could change in
 3564         # the future after we get some feedback from shadow mapping being used
 3565         # in real deployments. We also want to make sure we are dealing with
 3566         # global roles and not domain-scoped roles. We have specific tests
 3567         # below that test that behavior and the setup is done in the test.
 3568         member_role_ref = unit.new_role_ref(name='member')
 3569         assert member_role_ref['domain_id'] is None
 3570         self.member_role = PROVIDERS.role_api.create_role(
 3571             member_role_ref['id'], member_role_ref
 3572         )
 3573         observer_role_ref = unit.new_role_ref(name='observer')
 3574         assert observer_role_ref['domain_id'] is None
 3575         self.observer_role = PROVIDERS.role_api.create_role(
 3576             observer_role_ref['id'], observer_role_ref
 3577         )
 3578 
 3579         # This is a mapping of the project name to the role that is supposed to
 3580         # be assigned to the user on that project from the shadow mapping.
 3581         self.expected_results = {
 3582             'Production': 'observer',
 3583             'Staging': 'member',
 3584             'Project for tbo': 'admin'
 3585         }
 3586 
 3587     def auth_plugin_config_override(self):
 3588         methods = ['saml2', 'token']
 3589         super(ShadowMappingTests, self).auth_plugin_config_override(methods)
 3590 
 3591     def load_fixtures(self, fixtures):
 3592         super(ShadowMappingTests, self).load_fixtures(fixtures)
 3593         self.load_federation_sample_data()
 3594 
 3595     def test_shadow_mapping_creates_projects(self):
 3596         projects = PROVIDERS.resource_api.list_projects()
 3597         for project in projects:
 3598             self.assertNotIn(project['name'], self.expected_results)
 3599 
 3600         response = self._issue_unscoped_token()
 3601         self.assertValidMappedUser(
 3602             render_token.render_token_response_from_model(response)['token'])
 3603         unscoped_token = response.id
 3604         response = self.get('/auth/projects', token=unscoped_token)
 3605         projects = response.json_body['projects']
 3606         for project in projects:
 3607             project = PROVIDERS.resource_api.get_project_by_name(
 3608                 project['name'],
 3609                 self.idp['domain_id']
 3610             )
 3611             self.assertIn(project['name'], self.expected_results)
 3612 
 3613     def test_shadow_mapping_create_projects_role_assignments(self):
 3614         response = self._issue_unscoped_token()
 3615         self.assertValidMappedUser(
 3616             render_token.render_token_response_from_model(response)['token'])
 3617         unscoped_token = response.id
 3618         response = self.get('/auth/projects', token=unscoped_token)
 3619         projects = response.json_body['projects']
 3620         for project in projects:
 3621             # Ask for a scope token to each project in the mapping. Each token
 3622             # should contain a different role so let's check that is right,
 3623             # too.
 3624             scope = self._scope_request(
 3625                 unscoped_token, 'project', project['id']
 3626             )
 3627             response = self.v3_create_token(scope)
 3628             project_name = response.json_body['token']['project']['name']
 3629             roles = response.json_body['token']['roles']
 3630             self.assertEqual(
 3631                 self.expected_results[project_name], roles[0]['name']
 3632             )
 3633 
 3634     def test_shadow_mapping_does_not_create_roles(self):
 3635         # If a role required by the mapping does not exist, then we should fail
 3636         # the mapping since shadow mapping currently does not support creating
 3637         # mappings on-the-fly.
 3638         PROVIDERS.role_api.delete_role(self.observer_role['id'])
 3639         self.assertRaises(exception.RoleNotFound, self._issue_unscoped_token)
 3640 
 3641     def test_shadow_mapping_creates_project_in_identity_provider_domain(self):
 3642         response = self._issue_unscoped_token()
 3643         self.assertValidMappedUser(
 3644             render_token.render_token_response_from_model(response)['token'])
 3645         unscoped_token = response.id
 3646         response = self.get('/auth/projects', token=unscoped_token)
 3647         projects = response.json_body['projects']
 3648         for project in projects:
 3649             self.assertEqual(project['domain_id'], self.idp['domain_id'])
 3650 
 3651     def test_shadow_mapping_is_idempotent(self):
 3652         """Test that projects remain idempotent for every federated auth."""
 3653         response = self._issue_unscoped_token()
 3654         self.assertValidMappedUser(
 3655             render_token.render_token_response_from_model(response)['token'])
 3656         unscoped_token = response.id
 3657         response = self.get('/auth/projects', token=unscoped_token)
 3658         project_ids = [p['id'] for p in response.json_body['projects']]
 3659         response = self._issue_unscoped_token()
 3660         unscoped_token = response.id
 3661         response = self.get('/auth/projects', token=unscoped_token)
 3662         projects = response.json_body['projects']
 3663         for project in projects:
 3664             self.assertIn(project['id'], project_ids)
 3665 
 3666     def test_roles_outside_idp_domain_fail_mapping(self):
 3667         # Create a new domain
 3668         d = unit.new_domain_ref()
 3669         new_domain = PROVIDERS.resource_api.create_domain(d['id'], d)
 3670 
 3671         # Delete the member role and recreate it in a different domain
 3672         PROVIDERS.role_api.delete_role(self.member_role['id'])
 3673         member_role_ref = unit.new_role_ref(
 3674             name='member',
 3675             domain_id=new_domain['id']
 3676         )
 3677         PROVIDERS.role_api.create_role(member_role_ref['id'], member_role_ref)
 3678         self.assertRaises(
 3679             exception.DomainSpecificRoleNotWithinIdPDomain,
 3680             self._issue_unscoped_token
 3681         )
 3682 
 3683     def test_roles_in_idp_domain_can_be_assigned_from_mapping(self):
 3684         # Delete the member role and recreate it in the domain of the idp
 3685         PROVIDERS.role_api.delete_role(self.member_role['id'])
 3686         member_role_ref = unit.new_role_ref(
 3687             name='member',
 3688             domain_id=self.idp['domain_id']
 3689         )
 3690         PROVIDERS.role_api.create_role(member_role_ref['id'], member_role_ref)
 3691         response = self._issue_unscoped_token()
 3692         user_id = response.user_id
 3693         unscoped_token = response.id
 3694         response = self.get('/auth/projects', token=unscoped_token)
 3695         projects = response.json_body['projects']
 3696         staging_project = PROVIDERS.resource_api.get_project_by_name(
 3697             'Staging', self.idp['domain_id']
 3698         )
 3699         for project in projects:
 3700             # Even though the mapping successfully assigned the Staging project
 3701             # a member role for our user, the /auth/projects response doesn't
 3702             # include projects with only domain-specific role assignments.
 3703             self.assertNotEqual(project['name'], 'Staging')
 3704         domain_role_assignments = (
 3705             PROVIDERS.assignment_api.list_role_assignments(
 3706                 user_id=user_id,
 3707                 project_id=staging_project['id'],
 3708                 strip_domain_roles=False
 3709             )
 3710         )
 3711         self.assertEqual(
 3712             staging_project['id'], domain_role_assignments[0]['project_id']
 3713         )
 3714         self.assertEqual(
 3715             user_id, domain_role_assignments[0]['user_id']
 3716         )
 3717 
 3718     def test_mapping_with_groups_includes_projects_with_group_assignment(self):
 3719         # create a group called Observers
 3720         observer_group = unit.new_group_ref(
 3721             domain_id=self.idp['domain_id'],
 3722             name='Observers'
 3723         )
 3724         observer_group = PROVIDERS.identity_api.create_group(observer_group)
 3725         # make sure the Observers group has a role on the finance project
 3726         finance_project = unit.new_project_ref(
 3727             domain_id=self.idp['domain_id'],
 3728             name='Finance'
 3729         )
 3730         finance_project = PROVIDERS.resource_api.create_project(
 3731             finance_project['id'], finance_project
 3732         )
 3733         PROVIDERS.assignment_api.create_grant(
 3734             self.observer_role['id'],
 3735             group_id=observer_group['id'],
 3736             project_id=finance_project['id']
 3737         )
 3738         # update the mapping
 3739         group_rule = {
 3740             'group': {
 3741                 'name': 'Observers',
 3742                 'domain': {
 3743                     'id': self.idp['domain_id']
 3744                 }
 3745             }
 3746         }
 3747         updated_mapping = copy.deepcopy(mapping_fixtures.MAPPING_PROJECTS)
 3748         updated_mapping['rules'][0]['local'].append(group_rule)
 3749         PROVIDERS.federation_api.update_mapping(
 3750             self.mapping['id'], updated_mapping
 3751         )
 3752         response = self._issue_unscoped_token()
 3753         # user_id = response.json_body['token']['user']['id']
 3754         unscoped_token = response.id
 3755         response = self.get('/auth/projects', token=unscoped_token)
 3756         projects = response.json_body['projects']
 3757         self.expected_results = {
 3758             # These assignments are all a result of a direct mapping from the
 3759             # shadow user to the newly created project.
 3760             'Production': 'observer',
 3761             'Staging': 'member',
 3762             'Project for tbo': 'admin',
 3763             # This is a result of the mapping engine maintaining its old
 3764             # behavior.
 3765             'Finance': 'observer'
 3766         }
 3767         for project in projects:
 3768             # Ask for a scope token to each project in the mapping. Each token
 3769             # should contain a different role so let's check that is right,
 3770             # too.
 3771             scope = self._scope_request(
 3772                 unscoped_token, 'project', project['id']
 3773             )
 3774             response = self.v3_create_token(scope)
 3775             project_name = response.json_body['token']['project']['name']
 3776             roles = response.json_body['token']['roles']
 3777             self.assertEqual(
 3778                 self.expected_results[project_name], roles[0]['name']
 3779             )
 3780 
 3781     def test_user_gets_only_assigned_roles(self):
 3782         # in bug 1677723 user could get roles outside of what was assigned
 3783         # to them. This test verifies that this is no longer true.
 3784         # Authenticate once to create the projects
 3785         response = self._issue_unscoped_token()
 3786         self.assertValidMappedUser(
 3787             render_token.render_token_response_from_model(response)['token'])
 3788 
 3789         # Assign admin role to newly-created project to another user
 3790         staging_project = PROVIDERS.resource_api.get_project_by_name(
 3791             'Staging', self.idp['domain_id']
 3792         )
 3793         admin = unit.new_user_ref(CONF.identity.default_domain_id)
 3794         PROVIDERS.identity_api.create_user(admin)
 3795         PROVIDERS.assignment_api.create_grant(
 3796             self.role_admin['id'], user_id=admin['id'],
 3797             project_id=staging_project['id']
 3798         )
 3799 
 3800         # Authenticate again with the federated user and verify roles
 3801         response = self._issue_unscoped_token()
 3802         self.assertValidMappedUser(
 3803             render_token.render_token_response_from_model(response)['token'])
 3804         unscoped_token = response.id
 3805         scope = self._scope_request(
 3806             unscoped_token, 'project', staging_project['id']
 3807         )
 3808         response = self.v3_create_token(scope)
 3809         roles = response.json_body['token']['roles']
 3810         role_ids = [r['id'] for r in roles]
 3811         self.assertNotIn(self.role_admin['id'], role_ids)
 3812 
 3813 
 3814 class JsonHomeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
 3815     JSON_HOME_DATA = {
 3816         'https://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION'
 3817         '/1.0/rel/identity_provider': {
 3818             'href-template': '/OS-FEDERATION/identity_providers/{idp_id}',
 3819             'href-vars': {
 3820                 'idp_id': 'https://docs.openstack.org/api/openstack-identity/3'
 3821                 '/ext/OS-FEDERATION/1.0/param/idp_id'
 3822             },
 3823         },
 3824     }
 3825 
 3826 
 3827 def _is_xmlsec1_installed():
 3828     p = subprocess.Popen(
 3829         ['which', 'xmlsec1'],
 3830         stdout=subprocess.PIPE,
 3831         stderr=subprocess.PIPE)
 3832 
 3833     # invert the return code
 3834     return not bool(p.wait())
 3835 
 3836 
 3837 def _load_xml(filename):
 3838     with open(os.path.join(XMLDIR, filename), 'r') as xml:
 3839         return xml.read()
 3840 
 3841 
 3842 class SAMLGenerationTests(test_v3.RestfulTestCase):
 3843 
 3844     SP_AUTH_URL = ('http://beta.com:5000/v3/OS-FEDERATION/identity_providers'
 3845                    '/BETA/protocols/saml2/auth')
 3846 
 3847     ASSERTION_FILE = 'signed_saml2_assertion.xml'
 3848 
 3849     # The values of the following variables match the attributes values found
 3850     # in ASSERTION_FILE
 3851     ISSUER = 'https://acme.com/FIM/sps/openstack/saml20'
 3852     RECIPIENT = 'http://beta.com/Shibboleth.sso/SAML2/POST'
 3853     SUBJECT = 'test_user'
 3854     SUBJECT_DOMAIN = 'user_domain'
 3855     ROLES = ['admin', 'member']
 3856     PROJECT = 'development'
 3857     PROJECT_DOMAIN = 'project_domain'
 3858     GROUPS = ['JSON:{"name":"group1","domain":{"name":"Default"}}',
 3859               'JSON:{"name":"group2","domain":{"name":"Default"}}']
 3860     SAML_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2'
 3861     ECP_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2/ecp'
 3862     ASSERTION_VERSION = "2.0"
 3863     SERVICE_PROVDIER_ID = 'ACME'
 3864 
 3865     def setUp(self):
 3866         super(SAMLGenerationTests, self).setUp()
 3867         self.signed_assertion = saml2.create_class_from_xml_string(
 3868             saml.Assertion, _load_xml(self.ASSERTION_FILE))
 3869         self.sp = core.new_service_provider_ref(
 3870             auth_url=self.SP_AUTH_URL, sp_url=self.RECIPIENT
 3871         )
 3872         url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID
 3873         self.put(url, body={'service_provider': self.sp},
 3874                  expected_status=http.client.CREATED)
 3875 
 3876     def test_samlize_token_values(self):
 3877         """Test the SAML generator produces a SAML object.
 3878 
 3879         Test the SAML generator directly by passing known arguments, the result
 3880         should be a SAML object that consistently includes attributes based on
 3881         the known arguments that were passed in.
 3882 
 3883         """
 3884         with mock.patch.object(keystone_idp, '_sign_assertion',
 3885                                return_value=self.signed_assertion):
 3886             generator = keystone_idp.SAMLGenerator()
 3887             response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
 3888                                                self.SUBJECT,
 3889                                                self.SUBJECT_DOMAIN,
 3890                                                self.ROLES, self.PROJECT,
 3891                                                self.PROJECT_DOMAIN,
 3892                                                self.GROUPS)
 3893 
 3894         assertion = response.assertion
 3895         self.assertIsNotNone(assertion)
 3896         self.assertIsInstance(assertion, saml.Assertion)
 3897         issuer = response.issuer
 3898         self.assertEqual(self.RECIPIENT, response.destination)
 3899         self.assertEqual(self.ISSUER, issuer.text)
 3900 
 3901         user_attribute = assertion.attribute_statement[0].attribute[0]
 3902         self.assertEqual(self.SUBJECT, user_attribute.attribute_value[0].text)
 3903 
 3904         user_domain_attribute = (
 3905             assertion.attribute_statement[0].attribute[1])
 3906         self.assertEqual(self.SUBJECT_DOMAIN,
 3907                          user_domain_attribute.attribute_value[0].text)
 3908 
 3909         role_attribute = assertion.attribute_statement[0].attribute[2]
 3910         for attribute_value in role_attribute.attribute_value:
 3911             self.assertIn(attribute_value.text, self.ROLES)
 3912 
 3913         project_attribute = assertion.attribute_statement[0].attribute[3]
 3914         self.assertEqual(self.PROJECT,
 3915                          project_attribute.attribute_value[0].text)
 3916 
 3917         project_domain_attribute = (
 3918             assertion.attribute_statement[0].attribute[4])
 3919         self.assertEqual(self.PROJECT_DOMAIN,
 3920                          project_domain_attribute.attribute_value[0].text)
 3921 
 3922         group_attribute = assertion.attribute_statement[0].attribute[5]
 3923         for attribute_value in group_attribute.attribute_value:
 3924             self.assertIn(attribute_value.text, self.GROUPS)
 3925 
 3926     def test_comma_in_certfile_path(self):
 3927         self.config_fixture.config(
 3928             group='saml',
 3929             certfile=CONF.saml.certfile + ',')
 3930         generator = keystone_idp.SAMLGenerator()
 3931         self.assertRaises(
 3932             exception.UnexpectedError,
 3933             generator.samlize_token,
 3934             self.ISSUER,
 3935             self.RECIPIENT,
 3936             self.SUBJECT,
 3937             self.SUBJECT_DOMAIN,
 3938             self.ROLES,
 3939             self.PROJECT,
 3940             self.PROJECT_DOMAIN,
 3941             self.GROUPS)
 3942 
 3943     def test_comma_in_keyfile_path(self):
 3944         self.config_fixture.config(
 3945             group='saml',
 3946             keyfile=CONF.saml.keyfile + ',')
 3947         generator = keystone_idp.SAMLGenerator()
 3948         self.assertRaises(
 3949             exception.UnexpectedError,
 3950             generator.samlize_token,
 3951             self.ISSUER,
 3952             self.RECIPIENT,
 3953             self.SUBJECT,
 3954             self.SUBJECT_DOMAIN,
 3955             self.ROLES,
 3956             self.PROJECT,
 3957             self.PROJECT_DOMAIN,
 3958             self.GROUPS)
 3959 
 3960     def test_verify_assertion_object(self):
 3961         """Test that the Assertion object is built properly.
 3962 
 3963         The Assertion doesn't need to be signed in this test, so
 3964         _sign_assertion method is patched and doesn't alter the assertion.
 3965 
 3966         """
 3967         with mock.patch.object(keystone_idp, '_sign_assertion',
 3968                                side_effect=lambda x: x):
 3969             generator = keystone_idp.SAMLGenerator()
 3970             response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
 3971                                                self.SUBJECT,
 3972                                                self.SUBJECT_DOMAIN,
 3973                                                self.ROLES, self.PROJECT,
 3974                                                self.PROJECT_DOMAIN,
 3975                                                self.GROUPS)
 3976         assertion = response.assertion
 3977         self.assertEqual(self.ASSERTION_VERSION, assertion.version)
 3978 
 3979     def test_valid_saml_xml(self):
 3980         """Test the generated SAML object can become valid XML.
 3981 
 3982         Test the generator directly by passing known arguments, the result
 3983         should be a SAML object that consistently includes attributes based on
 3984         the known arguments that were passed in.
 3985 
 3986         """
 3987         with mock.patch.object(keystone_idp, '_sign_assertion',
 3988                                return_value=self.signed_assertion):
 3989             generator = keystone_idp.SAMLGenerator()
 3990             response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
 3991                                                self.SUBJECT,
 3992                                                self.SUBJECT_DOMAIN,
 3993                                                self.ROLES, self.PROJECT,
 3994                                                self.PROJECT_DOMAIN,
 3995                                                self.GROUPS)
 3996 
 3997         saml_str = response.to_string()
 3998         response = etree.fromstring(saml_str)
 3999         issuer = response[0]
 4000         assertion = response[2]
 4001 
 4002         self.assertEqual(self.RECIPIENT, response.get('Destination'))
 4003         self.assertEqual(self.ISSUER, issuer.text)
 4004 
 4005         user_attribute = assertion[4][0]
 4006         self.assertEqual(self.SUBJECT, user_attribute[0].text)
 4007 
 4008         user_domain_attribute = assertion[4][1]
 4009         self.assertEqual(self.SUBJECT_DOMAIN, user_domain_attribute[0].text)
 4010 
 4011         role_attribute = assertion[4][2]
 4012         for attribute_value in role_attribute:
 4013             self.assertIn(attribute_value.text, self.ROLES)
 4014 
 4015         project_attribute = assertion[4][3]
 4016         self.assertEqual(self.PROJECT, project_attribute[0].text)
 4017 
 4018         project_domain_attribute = assertion[4][4]
 4019         self.assertEqual(self.PROJECT_DOMAIN, project_domain_attribute[0].text)
 4020 
 4021         group_attribute = assertion[4][5]
 4022         for attribute_value in group_attribute:
 4023             self.assertIn(attribute_value.text, self.GROUPS)
 4024 
 4025     def test_assertion_using_explicit_namespace_prefixes(self):
 4026         def mocked_subprocess_check_output(*popenargs, **kwargs):
 4027             # the last option is the assertion file to be signed
 4028             if popenargs[0] != ['/usr/bin/which', CONF.saml.xmlsec1_binary]:
 4029                 filename = popenargs[0][-1]
 4030                 with open(filename, 'r') as f:
 4031                     assertion_content = f.read()
 4032                 # since we are not testing the signature itself, we can return
 4033                 # the assertion as is without signing it
 4034                 return assertion_content
 4035 
 4036         with mock.patch.object(subprocess, 'check_output',
 4037                                side_effect=mocked_subprocess_check_output):
 4038             generator = keystone_idp.SAMLGenerator()
 4039             response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
 4040                                                self.SUBJECT,
 4041                                                self.SUBJECT_DOMAIN,
 4042                                                self.ROLES, self.PROJECT,
 4043                                                self.PROJECT_DOMAIN,
 4044                                                self.GROUPS)
 4045             assertion_xml = response.assertion.to_string()
 4046             # The expected values in the assertions bellow need to be 'str' in
 4047             # Python 2 and 'bytes' in Python 3
 4048             # make sure we have the proper tag and prefix for the assertion
 4049             # namespace
 4050             self.assertIn(b'<saml:Assertion', assertion_xml)
 4051             self.assertIn(
 4052                 ('xmlns:saml="' + saml2.NAMESPACE + '"').encode('utf-8'),
 4053                 assertion_xml)
 4054             self.assertIn(
 4055                 ('xmlns:xmldsig="' + xmldsig.NAMESPACE).encode('utf-8'),
 4056                 assertion_xml)
 4057 
 4058     def test_saml_signing(self):
 4059         """Test that the SAML generator produces a SAML object.
 4060 
 4061         Test the SAML generator directly by passing known arguments, the result
 4062         should be a SAML object that consistently includes attributes based on
 4063         the known arguments that were passed in.
 4064 
 4065         """
 4066         if not _is_xmlsec1_installed():
 4067             self.skipTest('xmlsec1 is not installed')
 4068 
 4069         generator = keystone_idp.SAMLGenerator()
 4070         response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
 4071                                            self.SUBJECT, self.SUBJECT_DOMAIN,
 4072                                            self.ROLES, self.PROJECT,
 4073                                            self.PROJECT_DOMAIN,
 4074                                            self.GROUPS)
 4075 
 4076         signature = response.assertion.signature
 4077         self.assertIsNotNone(signature)
 4078         self.assertIsInstance(signature, xmldsig.Signature)
 4079 
 4080         idp_public_key = sigver.read_cert_from_file(CONF.saml.certfile, 'pem')
 4081         cert_text = signature.key_info.x509_data[0].x509_certificate.text
 4082         # NOTE(stevemar): Rather than one line of text, the certificate is
 4083         # printed with newlines for readability, we remove these so we can
 4084         # match it with the key that we used.
 4085         cert_text = cert_text.replace(os.linesep, '')
 4086         self.assertEqual(idp_public_key, cert_text)
 4087 
 4088     def _create_generate_saml_request(self, token_id, sp_id):
 4089         return {
 4090             "auth": {
 4091                 "identity": {
 4092                     "methods": [
 4093                         "token"
 4094                     ],
 4095                     "token": {
 4096                         "id": token_id
 4097                     }
 4098                 },
 4099                 "scope": {
 4100                     "service_provider": {
 4101                         "id": sp_id
 4102                     }
 4103                 }
 4104             }
 4105         }
 4106 
 4107     def _fetch_valid_token(self):
 4108         auth_data = self.build_authentication_request(
 4109             user_id=self.user['id'],
 4110             password=self.user['password'],
 4111             project_id=self.project['id'])
 4112         resp = self.v3_create_token(auth_data)
 4113         token_id = resp.headers.get('X-Subject-Token')
 4114         return token_id
 4115 
 4116     def _fetch_domain_scoped_token(self):
 4117         auth_data = self.build_authentication_request(
 4118             user_id=self.user['id'],
 4119             password=self.user['password'],
 4120             user_domain_id=self.domain['id'])
 4121         resp = self.v3_create_token(auth_data)
 4122         token_id = resp.headers.get('X-Subject-Token')
 4123         return token_id
 4124 
 4125     def test_not_project_scoped_token(self):
 4126         """Ensure SAML generation fails when passing domain-scoped tokens.
 4127 
 4128         The server should return a 403 Forbidden Action.
 4129 
 4130         """
 4131         self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
 4132         token_id = self._fetch_domain_scoped_token()
 4133         body = self._create_generate_saml_request(token_id,
 4134                                                   self.SERVICE_PROVDIER_ID)
 4135         with mock.patch.object(keystone_idp, '_sign_assertion',
 4136                                return_value=self.signed_assertion):
 4137             self.post(self.SAML_GENERATION_ROUTE, body=body,
 4138                       expected_status=http.client.FORBIDDEN)
 4139 
 4140     def test_generate_saml_route(self):
 4141         """Test that the SAML generation endpoint produces XML.
 4142 
 4143         The SAML endpoint /v3/auth/OS-FEDERATION/saml2 should take as input,
 4144         a scoped token ID, and a Service Provider ID.
 4145         The controller should fetch details about the user from the token,
 4146         and details about the service provider from its ID.
 4147         This should be enough information to invoke the SAML generator and
 4148         provide a valid SAML (XML) document back.
 4149 
 4150         """
 4151         self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
 4152         token_id = self._fetch_valid_token()
 4153         body = self._create_generate_saml_request(token_id,
 4154                                                   self.SERVICE_PROVDIER_ID)
 4155 
 4156         with mock.patch.object(keystone_idp, '_sign_assertion',
 4157                                return_value=self.signed_assertion):
 4158             http_response = self.post(self.SAML_GENERATION_ROUTE, body=body,
 4159                                       response_content_type='text/xml',
 4160                                       expected_status=http.client.OK)
 4161 
 4162         response = etree.fromstring(http_response.result)
 4163         issuer = response[0]
 4164         assertion = response[2]
 4165 
 4166         self.assertEqual(self.RECIPIENT, response.get('Destination'))
 4167         self.assertEqual(self.ISSUER, issuer.text)
 4168 
 4169         # NOTE(stevemar): We should test this against expected values,
 4170         # but the self.xyz attribute names are uuids, and we mock out
 4171         # the result. Ideally we should update the mocked result with
 4172         # some known data, and create the roles/project/user before
 4173         # these tests run.
 4174         user_attribute = assertion[4][0]
 4175         self.assertIsInstance(user_attribute[0].text, str)
 4176 
 4177         user_domain_attribute = assertion[4][1]
 4178         self.assertIsInstance(user_domain_attribute[0].text, str)
 4179 
 4180         role_attribute = assertion[4][2]
 4181         self.assertIsInstance(role_attribute[0].text, str)
 4182 
 4183         project_attribute = assertion[4][3]
 4184         self.assertIsInstance(project_attribute[0].text, str)
 4185 
 4186         project_domain_attribute = assertion[4][4]
 4187         self.assertIsInstance(project_domain_attribute[0].text, str)
 4188 
 4189         group_attribute = assertion[4][5]
 4190         self.assertIsInstance(group_attribute[0].text, str)
 4191 
 4192     def test_invalid_scope_body(self):
 4193         """Test that missing the scope in request body raises an exception.
 4194 
 4195         Raises exception.SchemaValidationError() - error 400 Bad Request
 4196 
 4197         """
 4198         token_id = uuid.uuid4().hex
 4199         body = self._create_generate_saml_request(token_id,
 4200                                                   self.SERVICE_PROVDIER_ID)
 4201         del body['auth']['scope']
 4202 
 4203         self.post(self.SAML_GENERATION_ROUTE, body=body,
 4204                   expected_status=http.client.BAD_REQUEST)
 4205 
 4206     def test_invalid_token_body(self):
 4207         """Test that missing the token in request body raises an exception.
 4208 
 4209         Raises exception.SchemaValidationError() - error 400 Bad Request
 4210 
 4211         """
 4212         token_id = uuid.uuid4().hex
 4213         body = self._create_generate_saml_request(token_id,
 4214                                                   self.SERVICE_PROVDIER_ID)
 4215         del body['auth']['identity']['token']
 4216 
 4217         self.post(self.SAML_GENERATION_ROUTE, body=body,
 4218                   expected_status=http.client.BAD_REQUEST)
 4219 
 4220     def test_sp_not_found(self):
 4221         """Test SAML generation with an invalid service provider ID.
 4222 
 4223         Raises exception.ServiceProviderNotFound() - error Not Found 404
 4224 
 4225         """
 4226         sp_id = uuid.uuid4().hex
 4227         token_id = self._fetch_valid_token()
 4228         body = self._create_generate_saml_request(token_id, sp_id)
 4229         self.post(self.SAML_GENERATION_ROUTE, body=body,
 4230                   expected_status=http.client.NOT_FOUND)
 4231 
 4232     def test_sp_disabled(self):
 4233         """Try generating assertion for disabled Service Provider."""
 4234         # Disable Service Provider
 4235         sp_ref = {'enabled': False}
 4236         PROVIDERS.federation_api.update_sp(self.SERVICE_PROVDIER_ID, sp_ref)
 4237 
 4238         token_id = self._fetch_valid_token()
 4239         body = self._create_generate_saml_request(token_id,
 4240                                                   self.SERVICE_PROVDIER_ID)
 4241         self.post(self.SAML_GENERATION_ROUTE, body=body,
 4242                   expected_status=http.client.FORBIDDEN)
 4243 
 4244     def test_token_not_found(self):
 4245         """Test that an invalid token in the request body raises an exception.
 4246 
 4247         Raises exception.TokenNotFound() - error Not Found 404
 4248 
 4249         """
 4250         token_id = uuid.uuid4().hex
 4251         body = self._create_generate_saml_request(token_id,
 4252                                                   self.SERVICE_PROVDIER_ID)
 4253         self.post(self.SAML_GENERATION_ROUTE, body=body,
 4254                   expected_status=http.client.NOT_FOUND)
 4255 
 4256     def test_generate_ecp_route(self):
 4257         """Test that the ECP generation endpoint produces XML.
 4258 
 4259         The ECP endpoint /v3/auth/OS-FEDERATION/saml2/ecp should take the same
 4260         input as the SAML generation endpoint (scoped token ID + Service
 4261         Provider ID).
 4262         The controller should return a SAML assertion that is wrapped in a
 4263         SOAP envelope.
 4264         """
 4265         self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
 4266         token_id = self._fetch_valid_token()
 4267         body = self._create_generate_saml_request(token_id,
 4268                                                   self.SERVICE_PROVDIER_ID)
 4269 
 4270         with mock.patch.object(keystone_idp, '_sign_assertion',
 4271                                return_value=self.signed_assertion):
 4272             http_response = self.post(self.ECP_GENERATION_ROUTE, body=body,
 4273                                       response_content_type='text/xml',
 4274                                       expected_status=http.client.OK)
 4275 
 4276         env_response = etree.fromstring(http_response.result)
 4277         header = env_response[0]
 4278 
 4279         # Verify the relay state starts with 'ss:mem'
 4280         prefix = CONF.saml.relay_state_prefix
 4281         self.assertThat(header[0].text, matchers.StartsWith(prefix))
 4282 
 4283         # Verify that the content in the body matches the expected assertion
 4284         body = env_response[1]
 4285         response = body[0]
 4286         issuer = response[0]
 4287         assertion = response[2]
 4288 
 4289         self.assertEqual(self.RECIPIENT, response.get('Destination'))
 4290         self.assertEqual(self.ISSUER, issuer.text)
 4291 
 4292         user_attribute = assertion[4][0]
 4293         self.assertIsInstance(user_attribute[0].text, str)
 4294 
 4295         user_domain_attribute = assertion[4][1]
 4296         self.assertIsInstance(user_domain_attribute[0].text, str)
 4297 
 4298         role_attribute = assertion[4][2]
 4299         self.assertIsInstance(role_attribute[0].text, str)
 4300 
 4301         project_attribute = assertion[4][3]
 4302         self.assertIsInstance(project_attribute[0].text, str)
 4303 
 4304         project_domain_attribute = assertion[4][4]
 4305         self.assertIsInstance(project_domain_attribute[0].text, str)
 4306 
 4307         group_attribute = assertion[4][5]
 4308         self.assertIsInstance(group_attribute[0].text, str)
 4309 
 4310     @mock.patch('saml2.create_class_from_xml_string')
 4311     @mock.patch('oslo_utils.fileutils.write_to_tempfile')
 4312     @mock.patch.object(subprocess, 'check_output')
 4313     def test_sign_assertion(self, check_output_mock,
 4314                             write_to_tempfile_mock, create_class_mock):
 4315         write_to_tempfile_mock.return_value = 'tmp_path'
 4316         check_output_mock.return_value = 'fakeoutput'
 4317 
 4318         keystone_idp._sign_assertion(self.signed_assertion)
 4319 
 4320         create_class_mock.assert_called_with(saml.Assertion, 'fakeoutput')
 4321 
 4322     @mock.patch('oslo_utils.fileutils.write_to_tempfile')
 4323     def test_sign_assertion_exc(self, write_to_tempfile_mock):
 4324         # If the command fails the command output is logged.
 4325         sample_returncode = 1
 4326         sample_output = self.getUniqueString()
 4327         write_to_tempfile_mock.return_value = 'tmp_path'
 4328 
 4329         def side_effect(*args, **kwargs):
 4330             if args[0] == ['/usr/bin/which', CONF.saml.xmlsec1_binary]:
 4331                 return '/usr/bin/xmlsec1\n'
 4332             else:
 4333                 raise subprocess.CalledProcessError(
 4334                     returncode=sample_returncode, cmd=CONF.saml.xmlsec1_binary,
 4335                     output=sample_output
 4336                 )
 4337 
 4338         with mock.patch.object(subprocess, 'check_output',
 4339                                side_effect=side_effect):
 4340             logger_fixture = self.useFixture(fixtures.LoggerFixture())
 4341             self.assertRaises(
 4342                 exception.SAMLSigningError,
 4343                 keystone_idp._sign_assertion,
 4344                 self.signed_assertion
 4345             )
 4346 
 4347             # The function __str__ in subprocess.CalledProcessError is
 4348             # different between py3.6 and lower python version.
 4349             expected_log = (
 4350                 "Error when signing assertion, reason: Command '%s' returned "
 4351                 "non-zero exit status %s\.? %s\n" %
 4352                 (CONF.saml.xmlsec1_binary, sample_returncode, sample_output))
 4353             self.assertRegex(logger_fixture.output,
 4354                              re.compile(r'%s' % expected_log))
 4355 
 4356     @mock.patch('oslo_utils.fileutils.write_to_tempfile')
 4357     @mock.patch.object(subprocess, 'check_output')
 4358     def test_sign_assertion_fileutils_exc(self, check_output_mock,
 4359                                           write_to_tempfile_mock):
 4360         exception_msg = 'fake'
 4361         write_to_tempfile_mock.side_effect = Exception(exception_msg)
 4362         check_output_mock.return_value = '/usr/bin/xmlsec1'
 4363 
 4364         logger_fixture = self.useFixture(fixtures.LoggerFixture())
 4365         self.assertRaises(exception.SAMLSigningError,
 4366                           keystone_idp._sign_assertion,
 4367                           self.signed_assertion)
 4368         expected_log = (
 4369             'Error when signing assertion, reason: %s\n' % exception_msg)
 4370         self.assertEqual(expected_log, logger_fixture.output)
 4371 
 4372     def test_sign_assertion_logs_message_if_xmlsec1_is_not_installed(self):
 4373         with mock.patch.object(subprocess, 'check_output') as co_mock:
 4374             co_mock.side_effect = subprocess.CalledProcessError(
 4375                 returncode=1, cmd=CONF.saml.xmlsec1_binary,
 4376             )
 4377             logger_fixture = self.useFixture(fixtures.LoggerFixture())
 4378             self.assertRaises(
 4379                 exception.SAMLSigningError,
 4380                 keystone_idp._sign_assertion,
 4381                 self.signed_assertion
 4382             )
 4383 
 4384             expected_log = ('Unable to locate xmlsec1 binary on the system. '
 4385                             'Check to make sure it is installed.\n')
 4386             self.assertEqual(expected_log, logger_fixture.output)
 4387 
 4388 
 4389 class IdPMetadataGenerationTests(test_v3.RestfulTestCase):
 4390     """A class for testing Identity Provider Metadata generation."""
 4391 
 4392     METADATA_URL = '/OS-FEDERATION/saml2/metadata'
 4393 
 4394     def setUp(self):
 4395         super(IdPMetadataGenerationTests, self).setUp()
 4396         self.generator = keystone_idp.MetadataGenerator()
 4397 
 4398     def config_overrides(self):
 4399         super(IdPMetadataGenerationTests, self).config_overrides()
 4400         self.config_fixture.config(
 4401             group='saml',
 4402             idp_entity_id=federation_fixtures.IDP_ENTITY_ID,
 4403             idp_sso_endpoint=federation_fixtures.IDP_SSO_ENDPOINT,
 4404             idp_organization_name=federation_fixtures.IDP_ORGANIZATION_NAME,
 4405             idp_organization_display_name=(
 4406                 federation_fixtures.IDP_ORGANIZATION_DISPLAY_NAME),
 4407             idp_organization_url=federation_fixtures.IDP_ORGANIZATION_URL,
 4408             idp_contact_company=federation_fixtures.IDP_CONTACT_COMPANY,
 4409             idp_contact_name=federation_fixtures.IDP_CONTACT_GIVEN_NAME,
 4410             idp_contact_surname=federation_fixtures.IDP_CONTACT_SURNAME,
 4411             idp_contact_email=federation_fixtures.IDP_CONTACT_EMAIL,
 4412             idp_contact_telephone=(
 4413                 federation_fixtures.IDP_CONTACT_TELEPHONE_NUMBER),
 4414             idp_contact_type=federation_fixtures.IDP_CONTACT_TYPE)
 4415 
 4416     def test_check_entity_id(self):
 4417         metadata = self.generator.generate_metadata()
 4418         self.assertEqual(federation_fixtures.IDP_ENTITY_ID, metadata.entity_id)
 4419 
 4420     def test_metadata_validity(self):
 4421         """Call md.EntityDescriptor method that does internal verification."""
 4422         self.generator.generate_metadata().verify()
 4423 
 4424     def test_serialize_metadata_object(self):
 4425         """Check whether serialization doesn't raise any exceptions."""
 4426         self.generator.generate_metadata().to_string()
 4427         # TODO(marek-denis): Check values here
 4428 
 4429     def test_check_idp_sso(self):
 4430         metadata = self.generator.generate_metadata()
 4431         idpsso_descriptor = metadata.idpsso_descriptor
 4432         self.assertIsNotNone(metadata.idpsso_descriptor)
 4433         self.assertEqual(federation_fixtures.IDP_SSO_ENDPOINT,
 4434                          idpsso_descriptor.single_sign_on_service.location)
 4435 
 4436         self.assertIsNotNone(idpsso_descriptor.organization)
 4437         organization = idpsso_descriptor.organization
 4438         self.assertEqual(federation_fixtures.IDP_ORGANIZATION_DISPLAY_NAME,
 4439                          organization.organization_display_name.text)
 4440         self.assertEqual(federation_fixtures.IDP_ORGANIZATION_NAME,
 4441                          organization.organization_name.text)
 4442         self.assertEqual(federation_fixtures.IDP_ORGANIZATION_URL,
 4443                          organization.organization_url.text)
 4444 
 4445         self.assertIsNotNone(idpsso_descriptor.contact_person)
 4446         contact_person = idpsso_descriptor.contact_person
 4447 
 4448         self.assertEqual(federation_fixtures.IDP_CONTACT_GIVEN_NAME,
 4449                          contact_person.given_name.text)
 4450         self.assertEqual(federation_fixtures.IDP_CONTACT_SURNAME,
 4451                          contact_person.sur_name.text)
 4452         self.assertEqual(federation_fixtures.IDP_CONTACT_EMAIL,
 4453                          contact_person.email_address.text)
 4454         self.assertEqual(federation_fixtures.IDP_CONTACT_TELEPHONE_NUMBER,
 4455                          contact_person.telephone_number.text)
 4456         self.assertEqual(federation_fixtures.IDP_CONTACT_TYPE,
 4457                          contact_person.contact_type)
 4458 
 4459     def test_metadata_no_organization(self):
 4460         self.config_fixture.config(
 4461             group='saml',
 4462             idp_organization_display_name=None,
 4463             idp_organization_url=None,
 4464             idp_organization_name=None)
 4465         metadata = self.generator.generate_metadata()
 4466         idpsso_descriptor = metadata.idpsso_descriptor
 4467         self.assertIsNotNone(metadata.idpsso_descriptor)
 4468         self.assertIsNone(idpsso_descriptor.organization)
 4469         self.assertIsNotNone(idpsso_descriptor.contact_person)
 4470 
 4471     def test_metadata_no_contact_person(self):
 4472         self.config_fixture.config(
 4473             group='saml',
 4474             idp_contact_name=None,
 4475             idp_contact_surname=None,
 4476             idp_contact_email=None,
 4477             idp_contact_telephone=None)
 4478         metadata = self.generator.generate_metadata()
 4479         idpsso_descriptor = metadata.idpsso_descriptor
 4480         self.assertIsNotNone(metadata.idpsso_descriptor)
 4481         self.assertIsNotNone(idpsso_descriptor.organization)
 4482         self.assertEqual([], idpsso_descriptor.contact_person)
 4483 
 4484     def test_metadata_invalid_idp_sso_endpoint(self):
 4485         self.config_fixture.config(
 4486             group='saml',
 4487             idp_sso_endpoint=None)
 4488         self.assertRaises(exception.ValidationError,
 4489                           self.generator.generate_metadata)
 4490 
 4491     def test_metadata_invalid_idp_entity_id(self):
 4492         self.config_fixture.config(
 4493             group='saml',
 4494             idp_entity_id=None)
 4495         self.assertRaises(exception.ValidationError,
 4496                           self.generator.generate_metadata)
 4497 
 4498     def test_get_metadata_with_no_metadata_file_configured(self):
 4499         self.get(self.METADATA_URL,
 4500                  expected_status=http.client.INTERNAL_SERVER_ERROR)
 4501 
 4502     def test_get_head_metadata(self):
 4503         self.config_fixture.config(
 4504             group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml')
 4505         self.head(self.METADATA_URL, expected_status=http.client.OK)
 4506         r = self.get(self.METADATA_URL, response_content_type='text/xml')
 4507         self.assertEqual('text/xml', r.headers.get('Content-Type'))
 4508 
 4509         reference_file = _load_xml('idp_saml2_metadata.xml')
 4510 
 4511         # `reference_file` needs to be converted to bytes to be able to be
 4512         # compared to `r.result` in the case of Python 3.
 4513         reference_file = str.encode(reference_file)
 4514         self.assertEqual(reference_file, r.result)
 4515 
 4516 
 4517 class ServiceProviderTests(test_v3.RestfulTestCase):
 4518     """A test class for Service Providers."""
 4519 
 4520     MEMBER_NAME = 'service_provider'
 4521     COLLECTION_NAME = 'service_providers'
 4522     SERVICE_PROVIDER_ID = 'ACME'
 4523     SP_KEYS = ['auth_url', 'id', 'enabled', 'description',
 4524                'relay_state_prefix', 'sp_url']
 4525 
 4526     def setUp(self):
 4527         super(ServiceProviderTests, self).setUp()
 4528         # Add a Service Provider
 4529         url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
 4530         self.SP_REF = core.new_service_provider_ref()
 4531         self.SERVICE_PROVIDER = self.put(
 4532             url, body={'service_provider': self.SP_REF},
 4533             expected_status=http.client.CREATED).result
 4534 
 4535     def base_url(self, suffix=None):
 4536         if suffix is not None:
 4537             return '/OS-FEDERATION/service_providers/' + str(suffix)
 4538         return '/OS-FEDERATION/service_providers'
 4539 
 4540     def _create_default_sp(self, body=None):
 4541         """Create default Service Provider."""
 4542         url = self.base_url(suffix=uuid.uuid4().hex)
 4543         if body is None:
 4544             body = core.new_service_provider_ref()
 4545         resp = self.put(url, body={'service_provider': body},
 4546                         expected_status=http.client.CREATED)
 4547         return resp
 4548 
 4549     def test_get_head_service_provider(self):
 4550         url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
 4551         resp = self.get(url)
 4552         self.assertValidEntity(resp.result['service_provider'],
 4553                                keys_to_check=self.SP_KEYS)
 4554         resp = self.head(url, expected_status=http.client.OK)
 4555 
 4556     def test_get_service_provider_fail(self):
 4557         url = self.base_url(suffix=uuid.uuid4().hex)
 4558         self.get(url, expected_status=http.client.NOT_FOUND)
 4559 
 4560     def test_create_service_provider(self):
 4561         url = self.base_url(suffix=uuid.uuid4().hex)
 4562         sp = core.new_service_provider_ref()
 4563         resp = self.put(url, body={'service_provider': sp},
 4564                         expected_status=http.client.CREATED)
 4565         self.assertValidEntity(resp.result['service_provider'],
 4566                                keys_to_check=self.SP_KEYS)
 4567 
 4568     @unit.skip_if_cache_disabled('federation')
 4569     def test_create_service_provider_invalidates_cache(self):
 4570         # List all service providers and make sure we only have one in the
 4571         # list. This service provider is from testing setup.
 4572         resp = self.get(self.base_url(), expected_status=http.client.OK)
 4573         self.assertThat(
 4574             resp.json_body['service_providers'],
 4575             matchers.HasLength(1)
 4576         )
 4577 
 4578         # Create a new service provider.
 4579         url = self.base_url(suffix=uuid.uuid4().hex)
 4580         sp = core.new_service_provider_ref()
 4581         self.put(url, body={'service_provider': sp},
 4582                  expected_status=http.client.CREATED)
 4583 
 4584         # List all service providers again and make sure we have two in the
 4585         # returned list.
 4586         resp = self.get(self.base_url(), expected_status=http.client.OK)
 4587         self.assertThat(
 4588             resp.json_body['service_providers'],
 4589             matchers.HasLength(2)
 4590         )
 4591 
 4592     @unit.skip_if_cache_disabled('federation')
 4593     def test_delete_service_provider_invalidates_cache(self):
 4594         # List all service providers and make sure we only have one in the
 4595         # list. This service provider is from testing setup.
 4596         resp = self.get(self.base_url(), expected_status=http.client.OK)
 4597         self.assertThat(
 4598             resp.json_body['service_providers'],
 4599             matchers.HasLength(1)
 4600         )
 4601 
 4602         # Create a new service provider.
 4603         url = self.base_url(suffix=uuid.uuid4().hex)
 4604         sp = core.new_service_provider_ref()
 4605         self.put(url, body={'service_provider': sp},
 4606                  expected_status=http.client.CREATED)
 4607 
 4608         # List all service providers again and make sure we have two in the
 4609         # returned list.
 4610         resp = self.get(self.base_url(), expected_status=http.client.OK)
 4611         self.assertThat(
 4612             resp.json_body['service_providers'],
 4613             matchers.HasLength(2)
 4614         )
 4615 
 4616         # Delete the service provider we created, which should invalidate the
 4617         # service provider cache. Get the list of service providers again and
 4618         # if the cache invalidated properly then we should only have one
 4619         # service provider in the list.
 4620         self.delete(url, expected_status=http.client.NO_CONTENT)
 4621         resp = self.get(self.base_url(), expected_status=http.client.OK)
 4622         self.assertThat(
 4623             resp.json_body['service_providers'],
 4624             matchers.HasLength(1)
 4625         )
 4626 
 4627     @unit.skip_if_cache_disabled('federation')
 4628     def test_update_service_provider_invalidates_cache(self):
 4629         # List all service providers and make sure we only have one in the
 4630         # list. This service provider is from testing setup.
 4631         resp = self.get(self.base_url(), expected_status=http.client.OK)
 4632         self.assertThat(
 4633             resp.json_body['service_providers'],
 4634             matchers.HasLength(1)
 4635         )
 4636 
 4637         # Create a new service provider.
 4638         service_provider_id = uuid.uuid4().hex
 4639         url = self.base_url(suffix=service_provider_id)
 4640         sp = core.new_service_provider_ref()
 4641         self.put(url, body={'service_provider': sp},
 4642                  expected_status=http.client.CREATED)
 4643 
 4644         # List all service providers again and make sure we have two in the
 4645         # returned list.
 4646         resp = self.get(self.base_url(), expected_status=http.client.OK)
 4647         self.assertThat(
 4648             resp.json_body['service_providers'],
 4649             matchers.HasLength(2)
 4650         )
 4651 
 4652         # Update the service provider we created, which should invalidate the
 4653         # service provider cache. Get the list of service providers again and
 4654         # if the cache invalidated properly then we see the value we updated.
 4655         updated_description = uuid.uuid4().hex
 4656         body = {'service_provider': {'description': updated_description}}
 4657         self.patch(url, body=body, expected_status=http.client.OK)
 4658         resp = self.get(self.base_url(), expected_status=http.client.OK)
 4659         self.assertThat(
 4660             resp.json_body['service_providers'],
 4661             matchers.HasLength(2)
 4662         )
 4663         for sp in resp.json_body['service_providers']:
 4664             if sp['id'] == service_provider_id:
 4665                 self.assertEqual(sp['description'], updated_description)
 4666 
 4667     def test_create_sp_relay_state_default(self):
 4668         """Create an SP without relay state, should default to `ss:mem`."""
 4669         url = self.base_url(suffix=uuid.uuid4().hex)
 4670         sp = core.new_service_provider_ref()
 4671         del sp['relay_state_prefix']
 4672         resp = self.put(url, body={'service_provider': sp},
 4673                         expected_status=http.client.CREATED)
 4674         sp_result = resp.result['service_provider']
 4675         self.assertEqual(CONF.saml.relay_state_prefix,
 4676                          sp_result['relay_state_prefix'])
 4677 
 4678     def test_create_sp_relay_state_non_default(self):
 4679         """Create an SP with custom relay state."""
 4680         url = self.base_url(suffix=uuid.uuid4().hex)
 4681         sp = core.new_service_provider_ref()
 4682         non_default_prefix = uuid.uuid4().hex
 4683         sp['relay_state_prefix'] = non_default_prefix
 4684         resp = self.put(url, body={'service_provider': sp},
 4685                         expected_status=http.client.CREATED)
 4686         sp_result = resp.result['service_provider']
 4687         self.assertEqual(non_default_prefix,
 4688                          sp_result['relay_state_prefix'])
 4689 
 4690     def test_create_service_provider_fail(self):
 4691         """Try adding SP object with unallowed attribute."""
 4692         url = self.base_url(suffix=uuid.uuid4().hex)
 4693         sp = core.new_service_provider_ref()
 4694         sp[uuid.uuid4().hex] = uuid.uuid4().hex
 4695         self.put(url, body={'service_provider': sp},
 4696                  expected_status=http.client.BAD_REQUEST)
 4697 
 4698     def test_list_head_service_providers(self):
 4699         """Test listing of service provider objects.
 4700 
 4701         Add two new service providers. List all available service providers.
 4702         Expect to get list of three service providers (one created by setUp())
 4703         Test if attributes match.
 4704 
 4705         """
 4706         ref_service_providers = {
 4707             uuid.uuid4().hex: core.new_service_provider_ref(),
 4708             uuid.uuid4().hex: core.new_service_provider_ref(),
 4709         }
 4710         for id, sp in ref_service_providers.items():
 4711             url = self.base_url(suffix=id)
 4712             self.put(url, body={'service_provider': sp},
 4713                      expected_status=http.client.CREATED)
 4714 
 4715         # Insert ids into service provider object, we will compare it with
 4716         # responses from server and those include 'id' attribute.
 4717 
 4718         ref_service_providers[self.SERVICE_PROVIDER_ID] = self.SP_REF
 4719         for id, sp in ref_service_providers.items():
 4720             sp['id'] = id
 4721 
 4722         url = self.base_url()
 4723         resp = self.get(url)
 4724         service_providers = resp.result
 4725         for service_provider in service_providers['service_providers']:
 4726             id = service_provider['id']
 4727             self.assertValidEntity(
 4728                 service_provider, ref=ref_service_providers[id],
 4729                 keys_to_check=self.SP_KEYS)
 4730 
 4731         self.head(url, expected_status=http.client.OK)
 4732 
 4733     def test_update_service_provider(self):
 4734         """Update existing service provider.
 4735 
 4736         Update default existing service provider and make sure it has been
 4737         properly changed.
 4738 
 4739         """
 4740         new_sp_ref = core.new_service_provider_ref()
 4741         url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
 4742         resp = self.patch(url, body={'service_provider': new_sp_ref})
 4743         patch_result = resp.result
 4744         new_sp_ref['id'] = self.SERVICE_PROVIDER_ID
 4745         self.assertValidEntity(patch_result['service_provider'],
 4746                                ref=new_sp_ref,
 4747                                keys_to_check=self.SP_KEYS)
 4748 
 4749         resp = self.get(url)
 4750         get_result = resp.result
 4751 
 4752         self.assertDictEqual(patch_result['service_provider'],
 4753                              get_result['service_provider'])
 4754 
 4755     def test_update_service_provider_immutable_parameters(self):
 4756         """Update immutable attributes in service provider.
 4757 
 4758         In this particular case the test will try to change ``id`` attribute.
 4759         The server should return an HTTP 403 Forbidden error code.
 4760 
 4761         """
 4762         new_sp_ref = {'id': uuid.uuid4().hex}
 4763         url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
 4764         self.patch(url, body={'service_provider': new_sp_ref},
 4765                    expected_status=http.client.BAD_REQUEST)
 4766 
 4767     def test_update_service_provider_unknown_parameter(self):
 4768         new_sp_ref = core.new_service_provider_ref()
 4769         new_sp_ref[uuid.uuid4().hex] = uuid.uuid4().hex
 4770         url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
 4771         self.patch(url, body={'service_provider': new_sp_ref},
 4772                    expected_status=http.client.BAD_REQUEST)
 4773 
 4774     def test_update_service_provider_returns_not_found(self):
 4775         new_sp_ref = core.new_service_provider_ref()
 4776         new_sp_ref['description'] = uuid.uuid4().hex
 4777         url = self.base_url(suffix=uuid.uuid4().hex)
 4778         self.patch(url, body={'service_provider': new_sp_ref},
 4779                    expected_status=http.client.NOT_FOUND)
 4780 
 4781     def test_update_sp_relay_state(self):
 4782         """Update an SP with custom relay state."""
 4783         new_sp_ref = core.new_service_provider_ref()
 4784         non_default_prefix = uuid.uuid4().hex
 4785         new_sp_ref['relay_state_prefix'] = non_default_prefix
 4786         url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
 4787         resp = self.patch(url, body={'service_provider': new_sp_ref})
 4788         sp_result = resp.result['service_provider']
 4789         self.assertEqual(non_default_prefix,
 4790                          sp_result['relay_state_prefix'])
 4791 
 4792     def test_delete_service_provider(self):
 4793         url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
 4794         self.delete(url)
 4795 
 4796     def test_delete_service_provider_returns_not_found(self):
 4797         url = self.base_url(suffix=uuid.uuid4().hex)
 4798         self.delete(url, expected_status=http.client.NOT_FOUND)
 4799 
 4800     def test_filter_list_sp_by_id(self):
 4801         def get_id(resp):
 4802             sp = resp.result.get('service_provider')
 4803             return sp.get('id')
 4804 
 4805         sp1_id = get_id(self._create_default_sp())
 4806         sp2_id = get_id(self._create_default_sp())
 4807 
 4808         # list the SP, should get SPs.
 4809         url = self.base_url()
 4810         resp = self.get(url)
 4811         sps = resp.result.get('service_providers')
 4812         entities_ids = [e['id'] for e in sps]
 4813         self.assertIn(sp1_id, entities_ids)
 4814         self.assertIn(sp2_id, entities_ids)
 4815 
 4816         # filter the SP by 'id'. Only SP1 should appear.
 4817         url = self.base_url() + '?id=' + sp1_id
 4818         resp = self.get(url)
 4819         sps = resp.result.get('service_providers')
 4820         entities_ids = [e['id'] for e in sps]
 4821         self.assertIn(sp1_id, entities_ids)
 4822         self.assertNotIn(sp2_id, entities_ids)
 4823 
 4824     def test_filter_list_sp_by_enabled(self):
 4825         def get_id(resp):
 4826             sp = resp.result.get('service_provider')
 4827             return sp.get('id')
 4828 
 4829         sp1_id = get_id(self._create_default_sp())
 4830         sp2_ref = core.new_service_provider_ref()
 4831         sp2_ref['enabled'] = False
 4832         sp2_id = get_id(self._create_default_sp(body=sp2_ref))
 4833 
 4834         # list the SP, should get two SPs.
 4835         url = self.base_url()
 4836         resp = self.get(url)
 4837         sps = resp.result.get('service_providers')
 4838         entities_ids = [e['id'] for e in sps]
 4839         self.assertIn(sp1_id, entities_ids)
 4840         self.assertIn(sp2_id, entities_ids)
 4841 
 4842         # filter the SP by 'enabled'. Only SP1 should appear.
 4843         url = self.base_url() + '?enabled=True'
 4844         resp = self.get(url)
 4845         sps = resp.result.get('service_providers')
 4846         entities_ids = [e['id'] for e in sps]
 4847         self.assertIn(sp1_id, entities_ids)
 4848         self.assertNotIn(sp2_id, entities_ids)
 4849 
 4850 
 4851 class WebSSOTests(FederatedTokenTests):
 4852     """A class for testing Web SSO."""
 4853 
 4854     SSO_URL = '/auth/OS-FEDERATION/websso/'
 4855     SSO_TEMPLATE_NAME = 'sso_callback_template.html'
 4856     SSO_TEMPLATE_PATH = os.path.join(core.dirs.etc(), SSO_TEMPLATE_NAME)
 4857     TRUSTED_DASHBOARD = 'http://horizon.com'
 4858     ORIGIN = urllib.parse.quote_plus(TRUSTED_DASHBOARD)
 4859     PROTOCOL_REMOTE_ID_ATTR = uuid.uuid4().hex
 4860 
 4861     def config_overrides(self):
 4862         super(WebSSOTests, self).config_overrides()
 4863         self.config_fixture.config(
 4864             group='federation',
 4865             trusted_dashboard=[self.TRUSTED_DASHBOARD],
 4866             sso_callback_template=self.SSO_TEMPLATE_PATH,
 4867             remote_id_attribute=self.REMOTE_ID_ATTR)
 4868 
 4869     def test_render_callback_template(self):
 4870         token_id = uuid.uuid4().hex
 4871         with self.make_request():
 4872             resp = (
 4873                 auth_api._AuthFederationWebSSOBase._render_template_response(
 4874                     self.TRUSTED_DASHBOARD, token_id))
 4875         # The expected value in the assertions bellow need to be 'str' in
 4876         # Python 2 and 'bytes' in Python 3
 4877         self.assertIn(token_id.encode('utf-8'), resp.data)
 4878         self.assertIn(self.TRUSTED_DASHBOARD.encode('utf-8'), resp.data)
 4879 
 4880     def test_federated_sso_auth(self):
 4881         environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0],
 4882                        'QUERY_STRING': 'origin=%s' % self.ORIGIN}
 4883         environment.update(mapping_fixtures.EMPLOYEE_ASSERTION)
 4884         with self.make_request(environ=environment):
 4885             resp = auth_api.AuthFederationWebSSOResource._perform_auth(
 4886                 self.PROTOCOL)
 4887         # `resp.data` will be `str` in Python 2 and `bytes` in Python 3
 4888         # which is why expected value: `self.TRUSTED_DASHBOARD`
 4889         # needs to be encoded
 4890         self.assertIn(self.TRUSTED_DASHBOARD.encode('utf-8'), resp.data)
 4891 
 4892     def test_get_sso_origin_host_case_insensitive(self):
 4893         # test lowercase hostname in trusted_dashboard
 4894         environ = {'QUERY_STRING': 'origin=http://horizon.com'}
 4895         with self.make_request(environ=environ):
 4896             host = auth_api._get_sso_origin_host()
 4897             self.assertEqual("http://horizon.com", host)
 4898             # test uppercase hostname in trusted_dashboard
 4899             self.config_fixture.config(
 4900                 group='federation',
 4901                 trusted_dashboard=['http://Horizon.com'])
 4902             host = auth_api._get_sso_origin_host()
 4903             self.assertEqual("http://horizon.com", host)
 4904 
 4905     def test_federated_sso_auth_with_protocol_specific_remote_id(self):
 4906         self.config_fixture.config(
 4907             group=self.PROTOCOL,
 4908             remote_id_attribute=self.PROTOCOL_REMOTE_ID_ATTR)
 4909 
 4910         environment = {self.PROTOCOL_REMOTE_ID_ATTR: self.REMOTE_IDS[0],
 4911                        'QUERY_STRING': 'origin=%s' % self.ORIGIN}
 4912         environment.update(mapping_fixtures.EMPLOYEE_ASSERTION)
 4913         with self.make_request(environ=environment):
 4914             resp = auth_api.AuthFederationWebSSOResource._perform_auth(
 4915                 self.PROTOCOL)
 4916         # `resp.data` will be `str` in Python 2 and `bytes` in Python 3
 4917         # which is why expected value: `self.TRUSTED_DASHBOARD`
 4918         # needs to be encoded
 4919         self.assertIn(self.TRUSTED_DASHBOARD.encode('utf-8'), resp.data)
 4920 
 4921     def test_federated_sso_auth_bad_remote_id(self):
 4922         environment = {self.REMOTE_ID_ATTR: self.IDP,
 4923                        'QUERY_STRING': 'origin=%s' % self.ORIGIN}
 4924         environment.update(mapping_fixtures.EMPLOYEE_ASSERTION)
 4925         with self.make_request(environ=environment):
 4926             self.assertRaises(
 4927                 exception.IdentityProviderNotFound,
 4928                 auth_api.AuthFederationWebSSOResource._perform_auth,
 4929                 self.PROTOCOL)
 4930 
 4931     def test_federated_sso_missing_query(self):
 4932         environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]}
 4933         environment.update(mapping_fixtures.EMPLOYEE_ASSERTION)
 4934         with self.make_request(environ=environment):
 4935             self.assertRaises(
 4936                 exception.ValidationError,
 4937                 auth_api.AuthFederationWebSSOResource._perform_auth,
 4938                 self.PROTOCOL)
 4939 
 4940     def test_federated_sso_missing_query_bad_remote_id(self):
 4941         environment = {self.REMOTE_ID_ATTR: self.IDP}
 4942         environment.update(mapping_fixtures.EMPLOYEE_ASSERTION)
 4943         with self.make_request(environ=environment):
 4944             self.assertRaises(
 4945                 exception.ValidationError,
 4946                 auth_api.AuthFederationWebSSOResource._perform_auth,
 4947                 self.PROTOCOL)
 4948 
 4949     def test_federated_sso_auth_protocol_not_found(self):
 4950         environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0],
 4951                        'QUERY_STRING': 'origin=%s' % self.ORIGIN}
 4952         environment.update(mapping_fixtures.EMPLOYEE_ASSERTION)
 4953         with self.make_request(environ=environment):
 4954             self.assertRaises(