"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "keystone/tests/unit/test_v3_federation.py" between
keystone-16.0.1.tar.gz and keystone-17.0.0.tar.gz

About: OpenStack Keystone (Core Service: Identity) provides an authentication and authorization service for other OpenStack services. Provides a catalog of endpoints for all OpenStack services.
The "Ussuri" series (latest release).

test_v3_federation.py  (keystone-16.0.1):test_v3_federation.py  (keystone-17.0.0)
skipping to change at line 19 skipping to change at line 19
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import copy import copy
import os import os
import random import random
import re import re
import subprocess import subprocess
from testtools import matchers from testtools import matchers
from unittest import mock
import uuid import uuid
import fixtures import fixtures
import flask import flask
import http.client
from lxml import etree from lxml import etree
import mock
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from oslo_utils import importutils from oslo_utils import importutils
import saml2 import saml2
from saml2 import saml from saml2 import saml
from saml2 import sigver from saml2 import sigver
from six.moves import http_client import urllib
from six.moves import range, urllib, zip
xmldsig = importutils.try_import("saml2.xmldsig") xmldsig = importutils.try_import("saml2.xmldsig")
if not xmldsig: if not xmldsig:
xmldsig = importutils.try_import("xmldsig") xmldsig = importutils.try_import("xmldsig")
from keystone.api._shared import authentication from keystone.api._shared import authentication
from keystone.api import auth as auth_api from keystone.api import auth as auth_api
from keystone.common import driver_hints from keystone.common import driver_hints
from keystone.common import provider_api from keystone.common import provider_api
from keystone.common import render_token from keystone.common import render_token
import keystone.conf import keystone.conf
skipping to change at line 868 skipping to change at line 868
idp_id = idp.get('id') idp_id = idp.get('id')
return (idp_id, idp) return (idp_id, idp)
def _get_idp(self, idp_id): def _get_idp(self, idp_id):
"""Fetch IdP entity based on its id.""" """Fetch IdP entity based on its id."""
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
resp = self.get(url) resp = self.get(url)
return resp return resp
def _create_default_idp(self, body=None, def _create_default_idp(self, body=None,
expected_status=http_client.CREATED): expected_status=http.client.CREATED):
"""Create default IdP.""" """Create default IdP."""
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
if body is None: if body is None:
body = self._http_idp_input() body = self._http_idp_input()
resp = self.put(url, body={'identity_provider': body}, resp = self.put(url, body={'identity_provider': body},
expected_status=expected_status) expected_status=expected_status)
return resp return resp
def _http_idp_input(self): def _http_idp_input(self):
"""Create default input dictionary for IdP data.""" """Create default input dictionary for IdP data."""
skipping to change at line 916 skipping to change at line 916
url = self.base_url(suffix=url) url = self.base_url(suffix=url)
r = self.get(url) r = self.get(url)
return r return r
def _create_mapping(self, mapping_id): def _create_mapping(self, mapping_id):
mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
mapping['id'] = mapping_id mapping['id'] = mapping_id
url = '/OS-FEDERATION/mappings/%s' % mapping_id url = '/OS-FEDERATION/mappings/%s' % mapping_id
self.put(url, self.put(url,
body={'mapping': mapping}, body={'mapping': mapping},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
def assertIdpDomainCreated(self, idp_id, domain_id): def assertIdpDomainCreated(self, idp_id, domain_id):
domain = PROVIDERS.resource_api.get_domain(domain_id) domain = PROVIDERS.resource_api.get_domain(domain_id)
self.assertEqual(domain_id, domain['name']) self.assertEqual(domain_id, domain['name'])
self.assertIn(idp_id, domain['description']) self.assertIn(idp_id, domain['description'])
def test_create_idp_without_domain_id(self): def test_create_idp_without_domain_id(self):
"""Create the IdentityProvider entity associated to remote_ids.""" """Create the IdentityProvider entity associated to remote_ids."""
keys_to_check = list(self.idp_keys) keys_to_check = list(self.idp_keys)
body = self.default_body.copy() body = self.default_body.copy()
skipping to change at line 977 skipping to change at line 977
# Create an identity provider with the same ID to intentionally cause a # Create an identity provider with the same ID to intentionally cause a
# conflict, this is going to result in a domain getting created for the # conflict, this is going to result in a domain getting created for the
# new identity provider. The domain for the new identity provider is # new identity provider. The domain for the new identity provider is
# going to be created before the conflict is raised from the database # going to be created before the conflict is raised from the database
# layer. This makes sure the domain is cleaned up after a Conflict is # layer. This makes sure the domain is cleaned up after a Conflict is
# detected. # detected.
resp = self.put( resp = self.put(
self.base_url(suffix=idp_id), self.base_url(suffix=idp_id),
body={'identity_provider': self.default_body.copy()}, body={'identity_provider': self.default_body.copy()},
expected_status=http_client.CONFLICT expected_status=http.client.CONFLICT
) )
domains = PROVIDERS.resource_api.list_domains() domains = PROVIDERS.resource_api.list_domains()
self.assertEqual(number_of_domains, len(domains)) self.assertEqual(number_of_domains, len(domains))
def test_conflicting_idp_does_not_delete_existing_domain(self): def test_conflicting_idp_does_not_delete_existing_domain(self):
# Create a new domain # Create a new domain
domain = unit.new_domain_ref() domain = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain(domain['id'], domain) PROVIDERS.resource_api.create_domain(domain['id'], domain)
# Create an identity provider and specify the domain # Create an identity provider and specify the domain
skipping to change at line 1004 skipping to change at line 1004
self.assertEqual(idp['domain_id'], domain['id']) self.assertEqual(idp['domain_id'], domain['id'])
# Create an identity provider with the same domain and ID to ensure a # Create an identity provider with the same domain and ID to ensure a
# Conflict is raised and then to verify the existing domain not deleted # Conflict is raised and then to verify the existing domain not deleted
# below # below
body = self.default_body.copy() body = self.default_body.copy()
body['domain_id'] = domain['id'] body['domain_id'] = domain['id']
resp = self.put( resp = self.put(
self.base_url(suffix=idp_id), self.base_url(suffix=idp_id),
body={'identity_provider': body}, body={'identity_provider': body},
expected_status=http_client.CONFLICT expected_status=http.client.CONFLICT
) )
# Make sure the domain specified in the second request was not deleted, # Make sure the domain specified in the second request was not deleted,
# since it wasn't auto-generated # since it wasn't auto-generated
self.assertIsNotNone(PROVIDERS.resource_api.get_domain(domain['id'])) self.assertIsNotNone(PROVIDERS.resource_api.get_domain(domain['id']))
def test_create_multi_idp_to_one_domain(self): def test_create_multi_idp_to_one_domain(self):
# create domain and add domain_id to keys to check # create domain and add domain_id to keys to check
domain = unit.new_domain_ref() domain = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain(domain['id'], domain) PROVIDERS.resource_api.create_domain(domain['id'], domain)
skipping to change at line 1031 skipping to change at line 1031
idp1 = self._create_default_idp(body=body) idp1 = self._create_default_idp(body=body)
self.assertValidResponse(idp1, 'identity_provider', dummy_validator, self.assertValidResponse(idp1, 'identity_provider', dummy_validator,
keys_to_check=keys_to_check, keys_to_check=keys_to_check,
ref=body) ref=body)
# create a 2nd idp with the same domain_id # create a 2nd idp with the same domain_id
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
body = self.default_body.copy() body = self.default_body.copy()
body['description'] = uuid.uuid4().hex body['description'] = uuid.uuid4().hex
body['domain_id'] = domain['id'] body['domain_id'] = domain['id']
idp2 = self.put(url, body={'identity_provider': body}, idp2 = self.put(url, body={'identity_provider': body},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
self.assertValidResponse(idp2, 'identity_provider', dummy_validator, self.assertValidResponse(idp2, 'identity_provider', dummy_validator,
keys_to_check=keys_to_check, keys_to_check=keys_to_check,
ref=body) ref=body)
self.assertEqual(idp1.result['identity_provider']['domain_id'], self.assertEqual(idp1.result['identity_provider']['domain_id'],
idp2.result['identity_provider']['domain_id']) idp2.result['identity_provider']['domain_id'])
def test_cannot_update_idp_domain(self): def test_cannot_update_idp_domain(self):
# create new idp # create new idp
body = self.default_body.copy() body = self.default_body.copy()
skipping to change at line 1053 skipping to change at line 1053
default_idp = self._fetch_attribute_from_response(default_resp, default_idp = self._fetch_attribute_from_response(default_resp,
'identity_provider') 'identity_provider')
idp_id = default_idp.get('id') idp_id = default_idp.get('id')
self.assertIsNotNone(idp_id) self.assertIsNotNone(idp_id)
# create domain and try to update the idp's domain # create domain and try to update the idp's domain
domain = unit.new_domain_ref() domain = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain(domain['id'], domain) PROVIDERS.resource_api.create_domain(domain['id'], domain)
body['domain_id'] = domain['id'] body['domain_id'] = domain['id']
body = {'identity_provider': body} body = {'identity_provider': body}
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
self.patch(url, body=body, expected_status=http_client.BAD_REQUEST) self.patch(url, body=body, expected_status=http.client.BAD_REQUEST)
def test_create_idp_with_nonexistent_domain_id_fails(self): def test_create_idp_with_nonexistent_domain_id_fails(self):
body = self.default_body.copy() body = self.default_body.copy()
body['description'] = uuid.uuid4().hex body['description'] = uuid.uuid4().hex
body['domain_id'] = uuid.uuid4().hex body['domain_id'] = uuid.uuid4().hex
self._create_default_idp(body=body, self._create_default_idp(body=body,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_create_idp_remote(self): def test_create_idp_remote(self):
"""Create the IdentityProvider entity associated to remote_ids.""" """Create the IdentityProvider entity associated to remote_ids."""
keys_to_check = list(self.idp_keys) keys_to_check = list(self.idp_keys)
keys_to_check.append('remote_ids') keys_to_check.append('remote_ids')
body = self.default_body.copy() body = self.default_body.copy()
body['description'] = uuid.uuid4().hex body['description'] = uuid.uuid4().hex
body['remote_ids'] = [uuid.uuid4().hex, body['remote_ids'] = [uuid.uuid4().hex,
uuid.uuid4().hex, uuid.uuid4().hex,
uuid.uuid4().hex] uuid.uuid4().hex]
skipping to change at line 1099 skipping to change at line 1099
body['remote_ids'] = [uuid.uuid4().hex, body['remote_ids'] = [uuid.uuid4().hex,
uuid.uuid4().hex, uuid.uuid4().hex,
uuid.uuid4().hex, uuid.uuid4().hex,
repeated_remote_id] repeated_remote_id]
self._create_default_idp(body=body) self._create_default_idp(body=body)
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
body['remote_ids'] = [uuid.uuid4().hex, body['remote_ids'] = [uuid.uuid4().hex,
repeated_remote_id] repeated_remote_id]
resp = self.put(url, body={'identity_provider': body}, resp = self.put(url, body={'identity_provider': body},
expected_status=http_client.CONFLICT) expected_status=http.client.CONFLICT)
resp_data = jsonutils.loads(resp.body) resp_data = jsonutils.loads(resp.body)
self.assertIn('Duplicate remote ID', self.assertIn('Duplicate remote ID',
resp_data.get('error', {}).get('message')) resp_data.get('error', {}).get('message'))
def test_create_idp_remote_empty(self): def test_create_idp_remote_empty(self):
"""Create an IdP with empty remote_ids.""" """Create an IdP with empty remote_ids."""
keys_to_check = list(self.idp_keys) keys_to_check = list(self.idp_keys)
keys_to_check.append('remote_ids') keys_to_check.append('remote_ids')
body = self.default_body.copy() body = self.default_body.copy()
skipping to change at line 1131 skipping to change at line 1131
body = self.default_body.copy() body = self.default_body.copy()
body['description'] = uuid.uuid4().hex body['description'] = uuid.uuid4().hex
body['remote_ids'] = None body['remote_ids'] = None
resp = self._create_default_idp(body=body) resp = self._create_default_idp(body=body)
expected = body.copy() expected = body.copy()
expected['remote_ids'] = [] expected['remote_ids'] = []
self.assertValidResponse(resp, 'identity_provider', dummy_validator, self.assertValidResponse(resp, 'identity_provider', dummy_validator,
keys_to_check=keys_to_check, keys_to_check=keys_to_check,
ref=expected) ref=expected)
def test_create_idp_authorization_ttl(self):
keys_to_check = list(self.idp_keys)
keys_to_check.append('authorization_ttl')
body = self.default_body.copy()
body['description'] = uuid.uuid4().hex
body['authorization_ttl'] = 10080
resp = self._create_default_idp(body)
expected = body.copy()
self.assertValidResponse(resp, 'identity_provider', dummy_validator,
keys_to_check=keys_to_check,
ref=expected)
def test_update_idp_remote_ids(self): def test_update_idp_remote_ids(self):
"""Update IdP's remote_ids parameter.""" """Update IdP's remote_ids parameter."""
body = self.default_body.copy() body = self.default_body.copy()
body['remote_ids'] = [uuid.uuid4().hex] body['remote_ids'] = [uuid.uuid4().hex]
default_resp = self._create_default_idp(body=body) default_resp = self._create_default_idp(body=body)
default_idp = self._fetch_attribute_from_response(default_resp, default_idp = self._fetch_attribute_from_response(default_resp,
'identity_provider') 'identity_provider')
idp_id = default_idp.get('id') idp_id = default_idp.get('id')
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
self.assertIsNotNone(idp_id) self.assertIsNotNone(idp_id)
skipping to change at line 1197 skipping to change at line 1209
A remote_id is the same for both so the second IdP is not A remote_id is the same for both so the second IdP is not
updated because of the uniqueness of the remote_ids. updated because of the uniqueness of the remote_ids.
Expect HTTP 409 Conflict code for the latter call. Expect HTTP 409 Conflict code for the latter call.
""" """
# Create first identity provider # Create first identity provider
body = self.default_body.copy() body = self.default_body.copy()
repeated_remote_id = uuid.uuid4().hex repeated_remote_id = uuid.uuid4().hex
body['remote_ids'] = [uuid.uuid4().hex, body['remote_ids'] = [uuid.uuid4().hex, repeated_remote_id]
repeated_remote_id]
self._create_default_idp(body=body) self._create_default_idp(body=body)
# Create second identity provider (without remote_ids) # Create second identity provider (without remote_ids)
body = self.default_body.copy() body = self.default_body.copy()
default_resp = self._create_default_idp(body=body) default_resp = self._create_default_idp(body=body)
default_idp = self._fetch_attribute_from_response(default_resp, default_idp = self._fetch_attribute_from_response(default_resp,
'identity_provider') 'identity_provider')
idp_id = default_idp.get('id') idp_id = default_idp.get('id')
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
body['remote_ids'] = [repeated_remote_id] body['remote_ids'] = [repeated_remote_id]
resp = self.patch(url, body={'identity_provider': body}, resp = self.patch(url, body={'identity_provider': body},
expected_status=http_client.CONFLICT) expected_status=http.client.CONFLICT)
resp_data = jsonutils.loads(resp.body) resp_data = jsonutils.loads(resp.body)
self.assertIn('Duplicate remote ID', self.assertIn('Duplicate remote ID',
resp_data['error']['message']) resp_data['error']['message'])
def test_update_idp_authorization_ttl(self):
body = self.default_body.copy()
body['authorization_ttl'] = 10080
default_resp = self._create_default_idp(body=body)
default_idp = self._fetch_attribute_from_response(default_resp,
'identity_provider')
idp_id = default_idp.get('id')
url = self.base_url(suffix=idp_id)
self.assertIsNotNone(idp_id)
body['authorization_ttl'] = None
body = {'identity_provider': body}
resp = self.patch(url, body=body)
updated_idp = self._fetch_attribute_from_response(resp,
'identity_provider')
body = body['identity_provider']
self.assertEqual(body['authorization_ttl'],
updated_idp.get('authorization_ttl'))
resp = self.get(url)
returned_idp = self._fetch_attribute_from_response(resp,
'identity_provider')
self.assertEqual(body['authorization_ttl'],
returned_idp.get('authorization_ttl'))
def test_list_head_idps(self, iterations=5): def test_list_head_idps(self, iterations=5):
"""List all available IdentityProviders. """List all available IdentityProviders.
This test collects ids of created IdPs and This test collects ids of created IdPs and
intersects it with the list of all available IdPs. intersects it with the list of all available IdPs.
List of all IdPs can be a superset of IdPs created in this test, List of all IdPs can be a superset of IdPs created in this test,
because other tests also create IdPs. because other tests also create IdPs.
""" """
def get_id(resp): def get_id(resp):
skipping to change at line 1249 skipping to change at line 1286
resp = self.get(url) resp = self.get(url)
self.assertValidListResponse(resp, 'identity_providers', self.assertValidListResponse(resp, 'identity_providers',
dummy_validator, dummy_validator,
keys_to_check=keys_to_check) keys_to_check=keys_to_check)
entities = self._fetch_attribute_from_response(resp, entities = self._fetch_attribute_from_response(resp,
'identity_providers') 'identity_providers')
entities_ids = set([e['id'] for e in entities]) entities_ids = set([e['id'] for e in entities])
ids_intersection = entities_ids.intersection(ids) ids_intersection = entities_ids.intersection(ids)
self.assertEqual(ids_intersection, ids) self.assertEqual(ids_intersection, ids)
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http.client.OK)
def test_filter_list_head_idp_by_id(self): def test_filter_list_head_idp_by_id(self):
def get_id(resp): def get_id(resp):
r = self._fetch_attribute_from_response(resp, r = self._fetch_attribute_from_response(resp,
'identity_provider') 'identity_provider')
return r.get('id') return r.get('id')
idp1_id = get_id(self._create_default_idp()) idp1_id = get_id(self._create_default_idp())
idp2_id = get_id(self._create_default_idp()) idp2_id = get_id(self._create_default_idp())
skipping to change at line 1275 skipping to change at line 1312
entities_ids = [e['id'] for e in entities] entities_ids = [e['id'] for e in entities]
self.assertItemsEqual(entities_ids, [idp1_id, idp2_id]) self.assertItemsEqual(entities_ids, [idp1_id, idp2_id])
# filter the IdP by ID. # filter the IdP by ID.
url = self.base_url() + '?id=' + idp1_id url = self.base_url() + '?id=' + idp1_id
resp = self.get(url) resp = self.get(url)
filtered_service_list = resp.json['identity_providers'] filtered_service_list = resp.json['identity_providers']
self.assertThat(filtered_service_list, matchers.HasLength(1)) self.assertThat(filtered_service_list, matchers.HasLength(1))
self.assertEqual(idp1_id, filtered_service_list[0].get('id')) self.assertEqual(idp1_id, filtered_service_list[0].get('id'))
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http.client.OK)
def test_filter_list_head_idp_by_enabled(self): def test_filter_list_head_idp_by_enabled(self):
def get_id(resp): def get_id(resp):
r = self._fetch_attribute_from_response(resp, r = self._fetch_attribute_from_response(resp,
'identity_provider') 'identity_provider')
return r.get('id') return r.get('id')
idp1_id = get_id(self._create_default_idp()) idp1_id = get_id(self._create_default_idp())
body = self.default_body.copy() body = self.default_body.copy()
skipping to change at line 1304 skipping to change at line 1341
entities_ids = [e['id'] for e in entities] entities_ids = [e['id'] for e in entities]
self.assertItemsEqual(entities_ids, [idp1_id, idp2_id]) self.assertItemsEqual(entities_ids, [idp1_id, idp2_id])
# filter the IdP by 'enabled'. # filter the IdP by 'enabled'.
url = self.base_url() + '?enabled=True' url = self.base_url() + '?enabled=True'
resp = self.get(url) resp = self.get(url)
filtered_service_list = resp.json['identity_providers'] filtered_service_list = resp.json['identity_providers']
self.assertThat(filtered_service_list, matchers.HasLength(1)) self.assertThat(filtered_service_list, matchers.HasLength(1))
self.assertEqual(idp1_id, filtered_service_list[0].get('id')) self.assertEqual(idp1_id, filtered_service_list[0].get('id'))
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http.client.OK)
def test_check_idp_uniqueness(self): def test_check_idp_uniqueness(self):
"""Add same IdP twice. """Add same IdP twice.
Expect HTTP 409 Conflict code for the latter call. Expect HTTP 409 Conflict code for the latter call.
""" """
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
body = self._http_idp_input() body = self._http_idp_input()
domain = unit.new_domain_ref() domain = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain(domain['id'], domain) PROVIDERS.resource_api.create_domain(domain['id'], domain)
body['domain_id'] = domain['id'] body['domain_id'] = domain['id']
self.put(url, body={'identity_provider': body}, self.put(url, body={'identity_provider': body},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
resp = self.put(url, body={'identity_provider': body}, resp = self.put(url, body={'identity_provider': body},
expected_status=http_client.CONFLICT) expected_status=http.client.CONFLICT)
resp_data = jsonutils.loads(resp.body) resp_data = jsonutils.loads(resp.body)
self.assertIn('Duplicate entry', self.assertIn('Duplicate entry',
resp_data.get('error', {}).get('message')) resp_data.get('error', {}).get('message'))
def test_get_head_idp(self): def test_get_head_idp(self):
"""Create and later fetch IdP.""" """Create and later fetch IdP."""
body = self._http_idp_input() body = self._http_idp_input()
domain = unit.new_domain_ref() domain = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain(domain['id'], domain) PROVIDERS.resource_api.create_domain(domain['id'], domain)
skipping to change at line 1345 skipping to change at line 1382
idp_id = default_idp.get('id') idp_id = default_idp.get('id')
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
resp = self.get(url) resp = self.get(url)
# Strip keys out of `body` dictionary. This is done # Strip keys out of `body` dictionary. This is done
# to be python 3 compatible # to be python 3 compatible
body_keys = list(body) body_keys = list(body)
self.assertValidResponse(resp, 'identity_provider', self.assertValidResponse(resp, 'identity_provider',
dummy_validator, keys_to_check=body_keys, dummy_validator, keys_to_check=body_keys,
ref=body) ref=body)
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http.client.OK)
def test_get_nonexisting_idp(self): def test_get_nonexisting_idp(self):
"""Fetch nonexisting IdP entity. """Fetch nonexisting IdP entity.
Expected HTTP 404 Not Found status code. Expected HTTP 404 Not Found status code.
""" """
idp_id = uuid.uuid4().hex idp_id = uuid.uuid4().hex
self.assertIsNotNone(idp_id) self.assertIsNotNone(idp_id)
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
self.get(url, expected_status=http_client.NOT_FOUND) self.get(url, expected_status=http.client.NOT_FOUND)
def test_delete_existing_idp(self): def test_delete_existing_idp(self):
"""Create and later delete IdP. """Create and later delete IdP.
Expect HTTP 404 Not Found for the GET IdP call. Expect HTTP 404 Not Found for the GET IdP call.
""" """
default_resp = self._create_default_idp() default_resp = self._create_default_idp()
default_idp = self._fetch_attribute_from_response(default_resp, default_idp = self._fetch_attribute_from_response(default_resp,
'identity_provider') 'identity_provider')
idp_id = default_idp.get('id') idp_id = default_idp.get('id')
self.assertIsNotNone(idp_id) self.assertIsNotNone(idp_id)
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
self.delete(url) self.delete(url)
self.get(url, expected_status=http_client.NOT_FOUND) self.get(url, expected_status=http.client.NOT_FOUND)
def test_delete_idp_also_deletes_assigned_protocols(self): def test_delete_idp_also_deletes_assigned_protocols(self):
"""Deleting an IdP will delete its assigned protocol.""" """Deleting an IdP will delete its assigned protocol."""
# create default IdP # create default IdP
default_resp = self._create_default_idp() default_resp = self._create_default_idp()
default_idp = self._fetch_attribute_from_response(default_resp, default_idp = self._fetch_attribute_from_response(default_resp,
'identity_provider') 'identity_provider')
idp_id = default_idp['id'] idp_id = default_idp['id']
protocol_id = uuid.uuid4().hex protocol_id = uuid.uuid4().hex
url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
idp_url = self.base_url(suffix=idp_id) idp_url = self.base_url(suffix=idp_id)
# assign protocol to IdP # assign protocol to IdP
kwargs = {'expected_status': http_client.CREATED} kwargs = {'expected_status': http.client.CREATED}
resp, idp_id, proto = self._assign_protocol_to_idp( resp, idp_id, proto = self._assign_protocol_to_idp(
url=url, url=url,
idp_id=idp_id, idp_id=idp_id,
proto=protocol_id, proto=protocol_id,
**kwargs) **kwargs)
# removing IdP will remove the assigned protocol as well # removing IdP will remove the assigned protocol as well
self.assertEqual( self.assertEqual(
1, len(PROVIDERS.federation_api.list_protocols(idp_id)) 1, len(PROVIDERS.federation_api.list_protocols(idp_id))
) )
self.delete(idp_url) self.delete(idp_url)
self.get(idp_url, expected_status=http_client.NOT_FOUND) self.get(idp_url, expected_status=http.client.NOT_FOUND)
self.assertEqual( self.assertEqual(
0, len(PROVIDERS.federation_api.list_protocols(idp_id)) 0, len(PROVIDERS.federation_api.list_protocols(idp_id))
) )
def test_delete_nonexisting_idp(self): def test_delete_nonexisting_idp(self):
"""Delete nonexisting IdP. """Delete nonexisting IdP.
Expect HTTP 404 Not Found for the GET IdP call. Expect HTTP 404 Not Found for the GET IdP call.
""" """
idp_id = uuid.uuid4().hex idp_id = uuid.uuid4().hex
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
self.delete(url, expected_status=http_client.NOT_FOUND) self.delete(url, expected_status=http.client.NOT_FOUND)
def test_update_idp_mutable_attributes(self): def test_update_idp_mutable_attributes(self):
"""Update IdP's mutable parameters.""" """Update IdP's mutable parameters."""
default_resp = self._create_default_idp() default_resp = self._create_default_idp()
default_idp = self._fetch_attribute_from_response(default_resp, default_idp = self._fetch_attribute_from_response(default_resp,
'identity_provider') 'identity_provider')
idp_id = default_idp.get('id') idp_id = default_idp.get('id')
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
self.assertIsNotNone(idp_id) self.assertIsNotNone(idp_id)
skipping to change at line 1466 skipping to change at line 1503
'identity_provider') 'identity_provider')
idp_id = default_idp.get('id') idp_id = default_idp.get('id')
self.assertIsNotNone(idp_id) self.assertIsNotNone(idp_id)
body = self._http_idp_input() body = self._http_idp_input()
body['id'] = uuid.uuid4().hex body['id'] = uuid.uuid4().hex
body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex] body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex]
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
self.patch(url, body={'identity_provider': body}, self.patch(url, body={'identity_provider': body},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_update_nonexistent_idp(self): def test_update_nonexistent_idp(self):
"""Update nonexistent IdP. """Update nonexistent IdP.
Expect HTTP 404 Not Found code. Expect HTTP 404 Not Found code.
""" """
idp_id = uuid.uuid4().hex idp_id = uuid.uuid4().hex
url = self.base_url(suffix=idp_id) url = self.base_url(suffix=idp_id)
body = self._http_idp_input() body = self._http_idp_input()
body['enabled'] = False body['enabled'] = False
body = {'identity_provider': body} body = {'identity_provider': body}
self.patch(url, body=body, expected_status=http_client.NOT_FOUND) self.patch(url, body=body, expected_status=http.client.NOT_FOUND)
def test_assign_protocol_to_idp(self): def test_assign_protocol_to_idp(self):
"""Assign a protocol to existing IdP.""" """Assign a protocol to existing IdP."""
self._assign_protocol_to_idp(expected_status=http_client.CREATED) self._assign_protocol_to_idp(expected_status=http.client.CREATED)
def test_protocol_composite_pk(self): def test_protocol_composite_pk(self):
"""Test that Keystone can add two entities. """Test that Keystone can add two entities.
The entities have identical names, however, attached to different The entities have identical names, however, attached to different
IdPs. IdPs.
1. Add IdP and assign it protocol with predefined name 1. Add IdP and assign it protocol with predefined name
2. Add another IdP and assign it a protocol with same name. 2. Add another IdP and assign it a protocol with same name.
Expect HTTP 201 code Expect HTTP 201 code
""" """
url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
kwargs = {'expected_status': http_client.CREATED} kwargs = {'expected_status': http.client.CREATED}
self._assign_protocol_to_idp(proto='saml2', self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs) url=url, **kwargs)
self._assign_protocol_to_idp(proto='saml2', self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs) url=url, **kwargs)
def test_protocol_idp_pk_uniqueness(self): def test_protocol_idp_pk_uniqueness(self):
"""Test whether Keystone checks for unique idp/protocol values. """Test whether Keystone checks for unique idp/protocol values.
Add same protocol twice, expect Keystone to reject a latter call and Add same protocol twice, expect Keystone to reject a latter call and
return HTTP 409 Conflict code. return HTTP 409 Conflict code.
""" """
url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
kwargs = {'expected_status': http_client.CREATED} kwargs = {'expected_status': http.client.CREATED}
resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2', resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs) url=url, **kwargs)
kwargs = {'expected_status': http_client.CONFLICT} kwargs = {'expected_status': http.client.CONFLICT}
self._assign_protocol_to_idp( self._assign_protocol_to_idp(
idp_id=idp_id, proto='saml2', validate=False, url=url, **kwargs idp_id=idp_id, proto='saml2', validate=False, url=url, **kwargs
) )
def test_assign_protocol_to_nonexistent_idp(self): def test_assign_protocol_to_nonexistent_idp(self):
"""Assign protocol to IdP that doesn't exist. """Assign protocol to IdP that doesn't exist.
Expect HTTP 404 Not Found code. Expect HTTP 404 Not Found code.
""" """
idp_id = uuid.uuid4().hex idp_id = uuid.uuid4().hex
kwargs = {'expected_status': http_client.NOT_FOUND} kwargs = {'expected_status': http.client.NOT_FOUND}
self._assign_protocol_to_idp(proto='saml2', self._assign_protocol_to_idp(proto='saml2',
idp_id=idp_id, idp_id=idp_id,
validate=False, validate=False,
**kwargs) **kwargs)
def test_crud_protocol_without_protocol_id_in_url(self): def test_crud_protocol_without_protocol_id_in_url(self):
# NOTE(morgan): This test is redundant but is added to ensure # NOTE(morgan): This test is redundant but is added to ensure
# the url routing error in bug 1817313 is explicitly covered. # the url routing error in bug 1817313 is explicitly covered.
# create a protocol, but do not put the ID in the URL # create a protocol, but do not put the ID in the URL
idp_id, _ = self._create_and_decapsulate_response() idp_id, _ = self._create_and_decapsulate_response()
skipping to change at line 1555 skipping to change at line 1592
'id': uuid.uuid4().hex, 'id': uuid.uuid4().hex,
'mapping_id': mapping_id 'mapping_id': mapping_id
} }
with self.test_client() as c: with self.test_client() as c:
token = self.get_scoped_token() token = self.get_scoped_token()
# DELETE/PATCH/PUT on non-trailing `/` results in # DELETE/PATCH/PUT on non-trailing `/` results in
# METHOD_NOT_ALLOWED # METHOD_NOT_ALLOWED
c.delete('/v3/OS-FEDERATION/identity_providers/%(idp_id)s' c.delete('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
'/protocols' % {'idp_id': idp_id}, '/protocols' % {'idp_id': idp_id},
headers={'X-Auth-Token': token}, headers={'X-Auth-Token': token},
expected_status_code=http_client.METHOD_NOT_ALLOWED) expected_status_code=http.client.METHOD_NOT_ALLOWED)
c.patch('/v3/OS-FEDERATION/identity_providers/%(idp_id)s' c.patch('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
'/protocols/' % {'idp_id': idp_id}, '/protocols/' % {'idp_id': idp_id},
json={'protocol': protocol}, json={'protocol': protocol},
headers={'X-Auth-Token': token}, headers={'X-Auth-Token': token},
expected_status_code=http_client.METHOD_NOT_ALLOWED) expected_status_code=http.client.METHOD_NOT_ALLOWED)
c.put('/v3/OS-FEDERATION/identity_providers/%(idp_id)s' c.put('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
'/protocols' % {'idp_id': idp_id}, '/protocols' % {'idp_id': idp_id},
json={'protocol': protocol}, json={'protocol': protocol},
headers={'X-Auth-Token': token}, headers={'X-Auth-Token': token},
expected_status_code=http_client.METHOD_NOT_ALLOWED) expected_status_code=http.client.METHOD_NOT_ALLOWED)
# DELETE/PATCH/PUT should raise 405 with trailing '/', it is # DELETE/PATCH/PUT should raise 405 with trailing '/', it is
# remapped to without the trailing '/' by the normalization # remapped to without the trailing '/' by the normalization
# middleware. # middleware.
c.delete('/v3/OS-FEDERATION/identity_providers/%(idp_id)s' c.delete('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
'/protocols/' % {'idp_id': idp_id}, '/protocols/' % {'idp_id': idp_id},
headers={'X-Auth-Token': token}, headers={'X-Auth-Token': token},
expected_status_code=http_client.METHOD_NOT_ALLOWED) expected_status_code=http.client.METHOD_NOT_ALLOWED)
c.patch('/v3/OS-FEDERATION/identity_providers/%(idp_id)s' c.patch('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
'/protocols/' % {'idp_id': idp_id}, '/protocols/' % {'idp_id': idp_id},
json={'protocol': protocol}, json={'protocol': protocol},
headers={'X-Auth-Token': token}, headers={'X-Auth-Token': token},
expected_status_code=http_client.METHOD_NOT_ALLOWED) expected_status_code=http.client.METHOD_NOT_ALLOWED)
c.put('/v3/OS-FEDERATION/identity_providers/%(idp_id)s' c.put('/v3/OS-FEDERATION/identity_providers/%(idp_id)s'
'/protocols/' % {'idp_id': idp_id}, '/protocols/' % {'idp_id': idp_id},
json={'protocol': protocol}, json={'protocol': protocol},
headers={'X-Auth-Token': token}, headers={'X-Auth-Token': token},
expected_status_code=http_client.METHOD_NOT_ALLOWED) expected_status_code=http.client.METHOD_NOT_ALLOWED)
def test_get_head_protocol(self): def test_get_head_protocol(self):
"""Create and later fetch protocol tied to IdP.""" """Create and later fetch protocol tied to IdP."""
resp, idp_id, proto = self._assign_protocol_to_idp( resp, idp_id, proto = self._assign_protocol_to_idp(
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id'] proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id']
url = "%s/protocols/%s" % (idp_id, proto_id) url = "%s/protocols/%s" % (idp_id, proto_id)
url = self.base_url(suffix=url) url = self.base_url(suffix=url)
resp = self.get(url) resp = self.get(url)
reference = {'id': proto_id} reference = {'id': proto_id}
# Strip keys out of `body` dictionary. This is done # Strip keys out of `body` dictionary. This is done
# to be python 3 compatible # to be python 3 compatible
reference_keys = list(reference) reference_keys = list(reference)
self.assertValidResponse(resp, 'protocol', self.assertValidResponse(resp, 'protocol',
dummy_validator, dummy_validator,
keys_to_check=reference_keys, keys_to_check=reference_keys,
ref=reference) ref=reference)
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http.client.OK)
def test_list_head_protocols(self): def test_list_head_protocols(self):
"""Create set of protocols and later list them. """Create set of protocols and later list them.
Compare input and output id sets. Compare input and output id sets.
""" """
resp, idp_id, proto = self._assign_protocol_to_idp( resp, idp_id, proto = self._assign_protocol_to_idp(
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
iterations = random.randint(0, 16) iterations = random.randint(0, 16)
protocol_ids = [] protocol_ids = []
for _ in range(iterations): for _ in range(iterations):
resp, _, proto = self._assign_protocol_to_idp( resp, _, proto = self._assign_protocol_to_idp(
idp_id=idp_id, idp_id=idp_id,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
proto_id = self._fetch_attribute_from_response(resp, 'protocol') proto_id = self._fetch_attribute_from_response(resp, 'protocol')
proto_id = proto_id['id'] proto_id = proto_id['id']
protocol_ids.append(proto_id) protocol_ids.append(proto_id)
url = "%s/protocols" % idp_id url = "%s/protocols" % idp_id
url = self.base_url(suffix=url) url = self.base_url(suffix=url)
resp = self.get(url) resp = self.get(url)
self.assertValidListResponse(resp, 'protocols', self.assertValidListResponse(resp, 'protocols',
dummy_validator, dummy_validator,
keys_to_check=['id']) keys_to_check=['id'])
entities = self._fetch_attribute_from_response(resp, 'protocols') entities = self._fetch_attribute_from_response(resp, 'protocols')
entities = set([entity['id'] for entity in entities]) entities = set([entity['id'] for entity in entities])
protocols_intersection = entities.intersection(protocol_ids) protocols_intersection = entities.intersection(protocol_ids)
self.assertEqual(protocols_intersection, set(protocol_ids)) self.assertEqual(protocols_intersection, set(protocol_ids))
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http.client.OK)
def test_update_protocols_attribute(self): def test_update_protocols_attribute(self):
"""Update protocol's attribute.""" """Update protocol's attribute."""
resp, idp_id, proto = self._assign_protocol_to_idp( resp, idp_id, proto = self._assign_protocol_to_idp(
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
new_mapping_id = uuid.uuid4().hex new_mapping_id = uuid.uuid4().hex
self._create_mapping(mapping_id=new_mapping_id) self._create_mapping(mapping_id=new_mapping_id)
url = "%s/protocols/%s" % (idp_id, proto) url = "%s/protocols/%s" % (idp_id, proto)
url = self.base_url(suffix=url) url = self.base_url(suffix=url)
body = {'mapping_id': new_mapping_id} body = {'mapping_id': new_mapping_id}
resp = self.patch(url, body={'protocol': body}) resp = self.patch(url, body={'protocol': body})
self.assertValidResponse(resp, 'protocol', dummy_validator, self.assertValidResponse(resp, 'protocol', dummy_validator,
keys_to_check=['id', 'mapping_id'], keys_to_check=['id', 'mapping_id'],
ref={'id': proto, ref={'id': proto,
skipping to change at line 1664 skipping to change at line 1701
def test_delete_protocol(self): def test_delete_protocol(self):
"""Delete protocol. """Delete protocol.
Expect HTTP 404 Not Found code for the GET call after the protocol is Expect HTTP 404 Not Found code for the GET call after the protocol is
deleted. deleted.
""" """
url = self.base_url(suffix='%(idp_id)s/' url = self.base_url(suffix='%(idp_id)s/'
'protocols/%(protocol_id)s') 'protocols/%(protocol_id)s')
resp, idp_id, proto = self._assign_protocol_to_idp( resp, idp_id, proto = self._assign_protocol_to_idp(
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
url = url % {'idp_id': idp_id, url = url % {'idp_id': idp_id,
'protocol_id': proto} 'protocol_id': proto}
self.delete(url) self.delete(url)
self.get(url, expected_status=http_client.NOT_FOUND) self.get(url, expected_status=http.client.NOT_FOUND)
class MappingCRUDTests(test_v3.RestfulTestCase): class MappingCRUDTests(test_v3.RestfulTestCase):
"""A class for testing CRUD operations for Mappings.""" """A class for testing CRUD operations for Mappings."""
MAPPING_URL = '/OS-FEDERATION/mappings/' MAPPING_URL = '/OS-FEDERATION/mappings/'
def assertValidMappingListResponse(self, resp, *args, **kwargs): def assertValidMappingListResponse(self, resp, *args, **kwargs):
return self.assertValidListResponse( return self.assertValidListResponse(
resp, resp,
'mappings', 'mappings',
skipping to change at line 1704 skipping to change at line 1741
self.assertIsNotNone(entity.get('id')) self.assertIsNotNone(entity.get('id'))
self.assertIsNotNone(entity.get('rules')) self.assertIsNotNone(entity.get('rules'))
if ref: if ref:
self.assertEqual(entity['rules'], ref['rules']) self.assertEqual(entity['rules'], ref['rules'])
return entity return entity
def _create_default_mapping_entry(self): def _create_default_mapping_entry(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
resp = self.put(url, resp = self.put(url,
body={'mapping': mapping_fixtures.MAPPING_LARGE}, body={'mapping': mapping_fixtures.MAPPING_LARGE},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
return resp return resp
def _get_id_from_response(self, resp): def _get_id_from_response(self, resp):
r = resp.result.get('mapping') r = resp.result.get('mapping')
return r.get('id') return r.get('id')
def test_mapping_create(self): def test_mapping_create(self):
resp = self._create_default_mapping_entry() resp = self._create_default_mapping_entry()
self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE) self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE)
def test_mapping_list_head(self): def test_mapping_list_head(self):
url = self.MAPPING_URL url = self.MAPPING_URL
self._create_default_mapping_entry() self._create_default_mapping_entry()
resp = self.get(url) resp = self.get(url)
entities = resp.result.get('mappings') entities = resp.result.get('mappings')
self.assertIsNotNone(entities) self.assertIsNotNone(entities)
self.assertResponseStatus(resp, http_client.OK) self.assertResponseStatus(resp, http.client.OK)
self.assertValidListLinks(resp.result.get('links')) self.assertValidListLinks(resp.result.get('links'))
self.assertEqual(1, len(entities)) self.assertEqual(1, len(entities))
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http.client.OK)
def test_mapping_delete(self): def test_mapping_delete(self):
url = self.MAPPING_URL + '%(mapping_id)s' url = self.MAPPING_URL + '%(mapping_id)s'
resp = self._create_default_mapping_entry() resp = self._create_default_mapping_entry()
mapping_id = self._get_id_from_response(resp) mapping_id = self._get_id_from_response(resp)
url = url % {'mapping_id': str(mapping_id)} url = url % {'mapping_id': str(mapping_id)}
resp = self.delete(url) resp = self.delete(url)
self.assertResponseStatus(resp, http_client.NO_CONTENT) self.assertResponseStatus(resp, http.client.NO_CONTENT)
self.get(url, expected_status=http_client.NOT_FOUND) self.get(url, expected_status=http.client.NOT_FOUND)
def test_mapping_get_head(self): def test_mapping_get_head(self):
url = self.MAPPING_URL + '%(mapping_id)s' url = self.MAPPING_URL + '%(mapping_id)s'
resp = self._create_default_mapping_entry() resp = self._create_default_mapping_entry()
mapping_id = self._get_id_from_response(resp) mapping_id = self._get_id_from_response(resp)
url = url % {'mapping_id': mapping_id} url = url % {'mapping_id': mapping_id}
resp = self.get(url) resp = self.get(url)
self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE) self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE)
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http.client.OK)
def test_mapping_update(self): def test_mapping_update(self):
url = self.MAPPING_URL + '%(mapping_id)s' url = self.MAPPING_URL + '%(mapping_id)s'
resp = self._create_default_mapping_entry() resp = self._create_default_mapping_entry()
mapping_id = self._get_id_from_response(resp) mapping_id = self._get_id_from_response(resp)
url = url % {'mapping_id': mapping_id} url = url % {'mapping_id': mapping_id}
resp = self.patch(url, resp = self.patch(url,
body={'mapping': mapping_fixtures.MAPPING_SMALL}) body={'mapping': mapping_fixtures.MAPPING_SMALL})
self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL) self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL)
resp = self.get(url) resp = self.get(url)
self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL) self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL)
def test_delete_mapping_dne(self): def test_delete_mapping_dne(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.delete(url, expected_status=http_client.NOT_FOUND) self.delete(url, expected_status=http.client.NOT_FOUND)
def test_get_mapping_dne(self): def test_get_mapping_dne(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.get(url, expected_status=http_client.NOT_FOUND) self.get(url, expected_status=http.client.NOT_FOUND)
def test_create_mapping_bad_requirements(self): def test_create_mapping_bad_requirements(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_BAD_REQ}) body={'mapping': mapping_fixtures.MAPPING_BAD_REQ})
def test_create_mapping_no_rules(self): def test_create_mapping_no_rules(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_NO_RULES}) body={'mapping': mapping_fixtures.MAPPING_NO_RULES})
def test_create_mapping_no_remote_objects(self): def test_create_mapping_no_remote_objects(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE}) body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE})
def test_create_mapping_bad_value(self): def test_create_mapping_bad_value(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE}) body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE})
def test_create_mapping_missing_local(self): def test_create_mapping_missing_local(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL}) body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL})
def test_create_mapping_missing_type(self): def test_create_mapping_missing_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE}) body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE})
def test_create_mapping_wrong_type(self): def test_create_mapping_wrong_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE}) body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE})
def test_create_mapping_extra_remote_properties_not_any_of(self): def test_create_mapping_extra_remote_properties_not_any_of(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping}) body={'mapping': mapping})
def test_create_mapping_extra_remote_properties_any_one_of(self): def test_create_mapping_extra_remote_properties_any_one_of(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping}) body={'mapping': mapping})
def test_create_mapping_extra_remote_properties_just_type(self): def test_create_mapping_extra_remote_properties_just_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping}) body={'mapping': mapping})
def test_create_mapping_empty_map(self): def test_create_mapping_empty_map(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': {}}) body={'mapping': {}})
def test_create_mapping_extra_rules_properties(self): def test_create_mapping_extra_rules_properties(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS}) body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS})
def test_create_mapping_with_blacklist_and_whitelist(self): def test_create_mapping_with_blacklist_and_whitelist(self):
"""Test for adding whitelist and blacklist in the rule. """Test for adding whitelist and blacklist in the rule.
Server should respond with HTTP 400 Bad Request error upon discovering Server should respond with HTTP 400 Bad Request error upon discovering
both ``whitelist`` and ``blacklist`` keywords in the same rule. both ``whitelist`` and ``blacklist`` keywords in the same rule.
""" """
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': mapping}) body={'mapping': mapping})
def test_create_mapping_with_local_user_and_local_domain(self): def test_create_mapping_with_local_user_and_local_domain(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
resp = self.put( resp = self.put(
url, url,
body={ body={
'mapping': mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN 'mapping': mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN
}, },
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
self.assertValidMappingResponse( self.assertValidMappingResponse(
resp, mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN) resp, mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN)
def test_create_mapping_with_ephemeral(self): def test_create_mapping_with_ephemeral(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
resp = self.put( resp = self.put(
url, url,
body={'mapping': mapping_fixtures.MAPPING_EPHEMERAL_USER}, body={'mapping': mapping_fixtures.MAPPING_EPHEMERAL_USER},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
self.assertValidMappingResponse( self.assertValidMappingResponse(
resp, mapping_fixtures.MAPPING_EPHEMERAL_USER) resp, mapping_fixtures.MAPPING_EPHEMERAL_USER)
def test_create_mapping_with_bad_user_type(self): def test_create_mapping_with_bad_user_type(self):
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
# get a copy of a known good map # get a copy of a known good map
bad_mapping = copy.deepcopy(mapping_fixtures.MAPPING_EPHEMERAL_USER) bad_mapping = copy.deepcopy(mapping_fixtures.MAPPING_EPHEMERAL_USER)
# now sabotage the user type # now sabotage the user type
bad_mapping['rules'][0]['local'][0]['user']['type'] = uuid.uuid4().hex bad_mapping['rules'][0]['local'][0]['user']['type'] = uuid.uuid4().hex
self.put(url, expected_status=http_client.BAD_REQUEST, self.put(url, expected_status=http.client.BAD_REQUEST,
body={'mapping': bad_mapping}) body={'mapping': bad_mapping})
def test_create_shadow_mapping_without_roles_fails(self): def test_create_shadow_mapping_without_roles_fails(self):
"""Validate that mappings with projects contain roles when created.""" """Validate that mappings with projects contain roles when created."""
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put( self.put(
url, url,
body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_ROLES}, body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_ROLES},
expected_status=http_client.BAD_REQUEST expected_status=http.client.BAD_REQUEST
) )
def test_update_shadow_mapping_without_roles_fails(self): def test_update_shadow_mapping_without_roles_fails(self):
"""Validate that mappings with projects contain roles when updated.""" """Validate that mappings with projects contain roles when updated."""
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
resp = self.put( resp = self.put(
url, url,
body={'mapping': mapping_fixtures.MAPPING_PROJECTS}, body={'mapping': mapping_fixtures.MAPPING_PROJECTS},
expected_status=http_client.CREATED expected_status=http.client.CREATED
) )
self.assertValidMappingResponse( self.assertValidMappingResponse(
resp, mapping_fixtures.MAPPING_PROJECTS resp, mapping_fixtures.MAPPING_PROJECTS
) )
self.patch( self.patch(
url, url,
body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_ROLES}, body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_ROLES},
expected_status=http_client.BAD_REQUEST expected_status=http.client.BAD_REQUEST
) )
def test_create_shadow_mapping_without_name_fails(self): def test_create_shadow_mapping_without_name_fails(self):
"""Validate project mappings contain the project name when created.""" """Validate project mappings contain the project name when created."""
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
self.put( self.put(
url, url,
body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_NAME}, body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_NAME},
expected_status=http_client.BAD_REQUEST expected_status=http.client.BAD_REQUEST
) )
def test_update_shadow_mapping_without_name_fails(self): def test_update_shadow_mapping_without_name_fails(self):
"""Validate project mappings contain the project name when updated.""" """Validate project mappings contain the project name when updated."""
url = self.MAPPING_URL + uuid.uuid4().hex url = self.MAPPING_URL + uuid.uuid4().hex
resp = self.put( resp = self.put(
url, url,
body={'mapping': mapping_fixtures.MAPPING_PROJECTS}, body={'mapping': mapping_fixtures.MAPPING_PROJECTS},
expected_status=http_client.CREATED expected_status=http.client.CREATED
) )
self.assertValidMappingResponse( self.assertValidMappingResponse(
resp, mapping_fixtures.MAPPING_PROJECTS resp, mapping_fixtures.MAPPING_PROJECTS
) )
self.patch( self.patch(
url, url,
body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_NAME}, body={'mapping': mapping_fixtures.MAPPING_PROJECTS_WITHOUT_NAME},
expected_status=http_client.BAD_REQUEST expected_status=http.client.BAD_REQUEST
) )
class FederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin): class FederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin):
def auth_plugin_config_override(self): def auth_plugin_config_override(self):
methods = ['saml2', 'token'] methods = ['saml2', 'token']
super(FederatedTokenTests, self).auth_plugin_config_override(methods) super(FederatedTokenTests, self).auth_plugin_config_override(methods)
def setUp(self): def setUp(self):
super(FederatedTokenTests, self).setUp() super(FederatedTokenTests, self).setUp()
skipping to change at line 2160 skipping to change at line 2197
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
self.role_admin['id'], user_id=admin['id'], self.role_admin['id'], user_id=admin['id'],
project_id=self.proj_employees['id'] project_id=self.proj_employees['id']
) )
# try to scope the token. It should fail # try to scope the token. It should fail
scope = self._scope_request( scope = self._scope_request(
unscoped_token, 'project', self.proj_employees['id'] unscoped_token, 'project', self.proj_employees['id']
) )
self.v3_create_token( self.v3_create_token(
scope, expected_status=http_client.UNAUTHORIZED) scope, expected_status=http.client.UNAUTHORIZED)
def test_issue_unscoped_token_malformed_environment(self): def test_issue_unscoped_token_malformed_environment(self):
"""Test whether non string objects are filtered out. """Test whether non string objects are filtered out.
Put non string objects into the environment, inject Put non string objects into the environment, inject
correct assertion and try to get an unscoped token. correct assertion and try to get an unscoped token.
Expect server not to fail on using split() method on Expect server not to fail on using split() method on
non string objects and return token id in the HTTP header. non string objects and return token id in the HTTP header.
""" """
skipping to change at line 2219 skipping to change at line 2256
Test plan: Test plan:
1) Disable IdP 1) Disable IdP
2) Try scoping unscoped token 2) Try scoping unscoped token
""" """
enabled_false = {'enabled': False} enabled_false = {'enabled': False}
PROVIDERS.federation_api.update_idp(self.IDP, enabled_false) PROVIDERS.federation_api.update_idp(self.IDP, enabled_false)
self.v3_create_token( self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_validate_token_after_deleting_idp_raises_not_found(self): def test_validate_token_after_deleting_idp_raises_not_found(self):
token = self.v3_create_token( token = self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN
) )
token_id = token.headers.get('X-Subject-Token') token_id = token.headers.get('X-Subject-Token')
federated_info = token.json_body['token']['user']['OS-FEDERATION'] federated_info = token.json_body['token']['user']['OS-FEDERATION']
idp_id = federated_info['identity_provider']['id'] idp_id = federated_info['identity_provider']['id']
PROVIDERS.federation_api.delete_idp(idp_id) PROVIDERS.federation_api.delete_idp(idp_id)
headers = { headers = {
'X-Subject-Token': token_id 'X-Subject-Token': token_id
} }
# NOTE(lbragstad): This raises a 404 NOT FOUND because the identity # NOTE(lbragstad): This raises a 404 NOT FOUND because the identity
# provider is no longer present. We raise 404 NOT FOUND when we # provider is no longer present. We raise 404 NOT FOUND when we
# validate a token and a project or domain no longer exists. # validate a token and a project or domain no longer exists.
self.get( self.get(
'/auth/tokens/', '/auth/tokens/',
token=token_id, token=token_id,
headers=headers, headers=headers,
expected_status=http_client.NOT_FOUND expected_status=http.client.NOT_FOUND
) )
def test_deleting_idp_cascade_deleting_fed_user(self): def test_deleting_idp_cascade_deleting_fed_user(self):
token = self.v3_create_token( token = self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN
) )
federated_info = token.json_body['token']['user']['OS-FEDERATION'] federated_info = token.json_body['token']['user']['OS-FEDERATION']
idp_id = federated_info['identity_provider']['id'] idp_id = federated_info['identity_provider']['id']
# There are three fed users (from 'EMPLOYEE_ASSERTION', # There are three fed users (from 'EMPLOYEE_ASSERTION',
skipping to change at line 2272 skipping to change at line 2309
# The related federated user should be deleted as well. # The related federated user should be deleted as well.
hints = driver_hints.Hints() hints = driver_hints.Hints()
hints.add_filter('idp_id', idp_id) hints.add_filter('idp_id', idp_id)
fed_users = PROVIDERS.shadow_users_api.get_federated_users(hints) fed_users = PROVIDERS.shadow_users_api.get_federated_users(hints)
self.assertEqual([], fed_users) self.assertEqual([], fed_users)
def test_scope_to_bad_project(self): def test_scope_to_bad_project(self):
"""Scope unscoped token with a project we don't have access to.""" """Scope unscoped token with a project we don't have access to."""
self.v3_create_token( self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_scope_to_project_multiple_times(self): def test_scope_to_project_multiple_times(self):
"""Try to scope the unscoped token multiple times. """Try to scope the unscoped token multiple times.
The new tokens should be scoped to: The new tokens should be scoped to:
* Customers' project * Customers' project
* Employees' project * Employees' project
""" """
skipping to change at line 2332 skipping to change at line 2369
token_resp, self.project_inherited['id']) token_resp, self.project_inherited['id'])
roles_ref = [self.role_customer] roles_ref = [self.role_customer]
projects_ref = self.project_inherited projects_ref = self.project_inherited
self._check_projects_and_roles(token_resp, roles_ref, projects_ref) self._check_projects_and_roles(token_resp, roles_ref, projects_ref)
self.assertValidMappedUser(token_resp) self.assertValidMappedUser(token_resp)
def test_scope_token_from_nonexistent_unscoped_token(self): def test_scope_token_from_nonexistent_unscoped_token(self):
"""Try to scope token from non-existent unscoped token.""" """Try to scope token from non-existent unscoped token."""
self.v3_create_token( self.v3_create_token(
self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN, self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_issue_token_from_rules_without_user(self): def test_issue_token_from_rules_without_user(self):
environ = copy.deepcopy(mapping_fixtures.BAD_TESTER_ASSERTION) environ = copy.deepcopy(mapping_fixtures.BAD_TESTER_ASSERTION)
with self.make_request(environ=environ): with self.make_request(environ=environ):
self.assertRaises(exception.Unauthorized, self.assertRaises(exception.Unauthorized,
authentication.authenticate_for_token, authentication.authenticate_for_token,
self.UNSCOPED_V3_SAML2_REQ) self.UNSCOPED_V3_SAML2_REQ)
def test_issue_token_with_nonexistent_group(self): def test_issue_token_with_nonexistent_group(self):
"""Inject assertion that matches rule issuing bad group id. """Inject assertion that matches rule issuing bad group id.
skipping to change at line 2385 skipping to change at line 2422
for body, domain_id_ref in zip(bodies, domain_ids): for body, domain_id_ref in zip(bodies, domain_ids):
r = self.v3_create_token(body) r = self.v3_create_token(body)
token_resp = r.result['token'] token_resp = r.result['token']
self._check_domain_scoped_token_attributes(token_resp, self._check_domain_scoped_token_attributes(token_resp,
domain_id_ref) domain_id_ref)
def test_scope_to_domain_with_only_inherited_roles_fails(self): def test_scope_to_domain_with_only_inherited_roles_fails(self):
"""Try to scope to a domain that has no direct roles.""" """Try to scope to a domain that has no direct roles."""
self.v3_create_token( self.v3_create_token(
self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER, self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
def test_list_projects(self): def test_list_projects(self):
urls = ('/OS-FEDERATION/projects', '/auth/projects') urls = ('/OS-FEDERATION/projects', '/auth/projects')
token = (self.tokens['CUSTOMER_ASSERTION'], token = (self.tokens['CUSTOMER_ASSERTION'],
self.tokens['EMPLOYEE_ASSERTION'], self.tokens['EMPLOYEE_ASSERTION'],
self.tokens['ADMIN_ASSERTION']) self.tokens['ADMIN_ASSERTION'])
projects_refs = (set([self.proj_customers['id'], projects_refs = (set([self.proj_customers['id'],
self.project_inherited['id']]), self.project_inherited['id']]),
skipping to change at line 2568 skipping to change at line 2605
# delete group # delete group
PROVIDERS.identity_api.delete_group(group['id']) PROVIDERS.identity_api.delete_group(group['id'])
# scope token to project_all, expect HTTP 500 # scope token to project_all, expect HTTP 500
scoped_token = self._scope_request( scoped_token = self._scope_request(
r.id, 'project', r.id, 'project',
self.project_all['id']) self.project_all['id'])
self.v3_create_token( self.v3_create_token(
scoped_token, expected_status=http_client.INTERNAL_SERVER_ERROR) scoped_token, expected_status=http.client.INTERNAL_SERVER_ERROR)
def test_lists_with_missing_group_in_backend(self): def test_lists_with_missing_group_in_backend(self):
"""Test a mapping that points to a group that does not exist. """Test a mapping that points to a group that does not exist.
For explicit mappings, we expect the group to exist in the backend, For explicit mappings, we expect the group to exist in the backend,
but for lists, specifically blacklists, a missing group is expected but for lists, specifically blacklists, a missing group is expected
as many groups will be specified by the IdP that are not Keystone as many groups will be specified by the IdP that are not Keystone
groups. groups.
The test scenario is as follows: The test scenario is as follows:
skipping to change at line 3206 skipping to change at line 3243
PROVIDERS.role_api.create_role(role_ref['id'], role_ref) PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
# authenticate via saml get back a user id # authenticate via saml get back a user id
user_id, unscoped_token = self._authenticate_via_saml() user_id, unscoped_token = self._authenticate_via_saml()
# exchange an unscoped token for a scoped token; resulting in # exchange an unscoped token for a scoped token; resulting in
# unauthorized because the user doesn't have any role assignments # unauthorized because the user doesn't have any role assignments
v3_scope_request = self._scope_request(unscoped_token, 'project', v3_scope_request = self._scope_request(unscoped_token, 'project',
project_ref['id']) project_ref['id'])
r = self.v3_create_token(v3_scope_request, r = self.v3_create_token(v3_scope_request,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
# assign project role to federated user # assign project role to federated user
PROVIDERS.assignment_api.add_role_to_user_and_project( PROVIDERS.assignment_api.add_role_to_user_and_project(
user_id, project_ref['id'], role_ref['id']) user_id, project_ref['id'], role_ref['id'])
# exchange an unscoped token for a scoped token # exchange an unscoped token for a scoped token
r = self.v3_create_token(v3_scope_request, r = self.v3_create_token(v3_scope_request,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
scoped_token = r.headers['X-Subject-Token'] scoped_token = r.headers['X-Subject-Token']
# ensure user can access resource based on role assignment # ensure user can access resource based on role assignment
path = '/projects/%(project_id)s' % {'project_id': project_ref['id']} path = '/projects/%(project_id)s' % {'project_id': project_ref['id']}
r = self.v3_request(path=path, method='GET', r = self.v3_request(path=path, method='GET',
expected_status=http_client.OK, expected_status=http.client.OK,
token=scoped_token) token=scoped_token)
self.assertValidProjectResponse(r, project_ref) self.assertValidProjectResponse(r, project_ref)
# create a 2nd project # create a 2nd project
project_ref2 = unit.new_project_ref( project_ref2 = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id) domain_id=CONF.identity.default_domain_id)
PROVIDERS.resource_api.create_project(project_ref2['id'], project_ref2) PROVIDERS.resource_api.create_project(project_ref2['id'], project_ref2)
# ensure the user cannot access the 2nd resource (forbidden) # ensure the user cannot access the 2nd resource (forbidden)
path = '/projects/%(project_id)s' % {'project_id': project_ref2['id']} path = '/projects/%(project_id)s' % {'project_id': project_ref2['id']}
r = self.v3_request(path=path, method='GET', r = self.v3_request(path=path, method='GET',
expected_status=http_client.FORBIDDEN, expected_status=http.client.FORBIDDEN,
token=scoped_token) token=scoped_token)
def test_domain_scoped_user_role_assignment(self): def test_domain_scoped_user_role_assignment(self):
# create domain and role # create domain and role
domain_ref = unit.new_domain_ref() domain_ref = unit.new_domain_ref()
PROVIDERS.resource_api.create_domain(domain_ref['id'], domain_ref) PROVIDERS.resource_api.create_domain(domain_ref['id'], domain_ref)
role_ref = unit.new_role_ref() role_ref = unit.new_role_ref()
PROVIDERS.role_api.create_role(role_ref['id'], role_ref) PROVIDERS.role_api.create_role(role_ref['id'], role_ref)
# authenticate via saml get back a user id # authenticate via saml get back a user id
user_id, unscoped_token = self._authenticate_via_saml() user_id, unscoped_token = self._authenticate_via_saml()
# exchange an unscoped token for a scoped token; resulting in # exchange an unscoped token for a scoped token; resulting in
# unauthorized because the user doesn't have any role assignments # unauthorized because the user doesn't have any role assignments
v3_scope_request = self._scope_request(unscoped_token, 'domain', v3_scope_request = self._scope_request(unscoped_token, 'domain',
domain_ref['id']) domain_ref['id'])
r = self.v3_create_token(v3_scope_request, r = self.v3_create_token(v3_scope_request,
expected_status=http_client.UNAUTHORIZED) expected_status=http.client.UNAUTHORIZED)
# assign domain role to user # assign domain role to user
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
role_ref['id'], user_id=user_id, domain_id=domain_ref['id'] role_ref['id'], user_id=user_id, domain_id=domain_ref['id']
) )
# exchange an unscoped token for domain scoped token and test # exchange an unscoped token for domain scoped token and test
r = self.v3_create_token(v3_scope_request, r = self.v3_create_token(v3_scope_request,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
self.assertIsNotNone(r.headers.get('X-Subject-Token')) self.assertIsNotNone(r.headers.get('X-Subject-Token'))
token_resp = r.result['token'] token_resp = r.result['token']
self.assertIn('domain', token_resp) self.assertIn('domain', token_resp)
def test_auth_projects_matches_federation_projects(self): def test_auth_projects_matches_federation_projects(self):
# create project and role # create project and role
project_ref = unit.new_project_ref( project_ref = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id) domain_id=CONF.identity.default_domain_id)
PROVIDERS.resource_api.create_project(project_ref['id'], project_ref) PROVIDERS.resource_api.create_project(project_ref['id'], project_ref)
role_ref = unit.new_role_ref() role_ref = unit.new_role_ref()
skipping to change at line 3401 skipping to change at line 3438
user_id, unscoped_token = self._authenticate_via_saml() user_id, unscoped_token = self._authenticate_via_saml()
# get federation group domains # get federation group domains
r = self.get('/OS-FEDERATION/domains', token=unscoped_token) r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
group_domains = r.result['domains'] group_domains = r.result['domains']
domain_from_group = group_domains[0] domain_from_group = group_domains[0]
self.head( self.head(
'/OS-FEDERATION/domains', '/OS-FEDERATION/domains',
token=unscoped_token, token=unscoped_token,
expected_status=http_client.OK expected_status=http.client.OK
) )
# assign group domain and role to user, this should create a # assign group domain and role to user, this should create a
# duplicate domain # duplicate domain
PROVIDERS.assignment_api.create_grant( PROVIDERS.assignment_api.create_grant(
role_ref['id'], user_id=user_id, domain_id=domain_from_group['id'] role_ref['id'], user_id=user_id, domain_id=domain_from_group['id']
) )
# get user domains via /OS-FEDERATION/domains and test for duplicates # get user domains via /OS-FEDERATION/domains and test for duplicates
r = self.get('/OS-FEDERATION/domains', token=unscoped_token) r = self.get('/OS-FEDERATION/domains', token=unscoped_token)
skipping to change at line 3442 skipping to change at line 3479
user_id, unscoped_token = self._authenticate_via_saml() user_id, unscoped_token = self._authenticate_via_saml()
# get federation group projects # get federation group projects
r = self.get('/OS-FEDERATION/projects', token=unscoped_token) r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
group_projects = r.result['projects'] group_projects = r.result['projects']
project_from_group = group_projects[0] project_from_group = group_projects[0]
self.head( self.head(
'/OS-FEDERATION/projects', '/OS-FEDERATION/projects',
token=unscoped_token, token=unscoped_token,
expected_status=http_client.OK expected_status=http.client.OK
) )
# assign group project and role to user, this should create a # assign group project and role to user, this should create a
# duplicate project # duplicate project
PROVIDERS.assignment_api.add_role_to_user_and_project( PROVIDERS.assignment_api.add_role_to_user_and_project(
user_id, project_from_group['id'], role_ref['id']) user_id, project_from_group['id'], role_ref['id'])
# get user projects via /OS-FEDERATION/projects and test for duplicates # get user projects via /OS-FEDERATION/projects and test for duplicates
r = self.get('/OS-FEDERATION/projects', token=unscoped_token) r = self.get('/OS-FEDERATION/projects', token=unscoped_token)
user_projects = r.result['projects'] user_projects = r.result['projects']
skipping to change at line 3806 skipping to change at line 3843
# The values of the following variables match the attributes values found # The values of the following variables match the attributes values found
# in ASSERTION_FILE # in ASSERTION_FILE
ISSUER = 'https://acme.com/FIM/sps/openstack/saml20' ISSUER = 'https://acme.com/FIM/sps/openstack/saml20'
RECIPIENT = 'http://beta.com/Shibboleth.sso/SAML2/POST' RECIPIENT = 'http://beta.com/Shibboleth.sso/SAML2/POST'
SUBJECT = 'test_user' SUBJECT = 'test_user'
SUBJECT_DOMAIN = 'user_domain' SUBJECT_DOMAIN = 'user_domain'
ROLES = ['admin', 'member'] ROLES = ['admin', 'member']
PROJECT = 'development' PROJECT = 'development'
PROJECT_DOMAIN = 'project_domain' PROJECT_DOMAIN = 'project_domain'
GROUPS = ['JSON:{"name":"group1","domain":{"name":"Default"}}',
'JSON:{"name":"group2","domain":{"name":"Default"}}']
SAML_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2' SAML_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2'
ECP_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2/ecp' ECP_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2/ecp'
ASSERTION_VERSION = "2.0" ASSERTION_VERSION = "2.0"
SERVICE_PROVDIER_ID = 'ACME' SERVICE_PROVDIER_ID = 'ACME'
def setUp(self): def setUp(self):
super(SAMLGenerationTests, self).setUp() super(SAMLGenerationTests, self).setUp()
self.signed_assertion = saml2.create_class_from_xml_string( self.signed_assertion = saml2.create_class_from_xml_string(
saml.Assertion, _load_xml(self.ASSERTION_FILE)) saml.Assertion, _load_xml(self.ASSERTION_FILE))
self.sp = core.new_service_provider_ref( self.sp = core.new_service_provider_ref(
auth_url=self.SP_AUTH_URL, sp_url=self.RECIPIENT auth_url=self.SP_AUTH_URL, sp_url=self.RECIPIENT
) )
url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID
self.put(url, body={'service_provider': self.sp}, self.put(url, body={'service_provider': self.sp},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
def test_samlize_token_values(self): def test_samlize_token_values(self):
"""Test the SAML generator produces a SAML object. """Test the SAML generator produces a SAML object.
Test the SAML generator directly by passing known arguments, the result Test the SAML generator directly by passing known arguments, the result
should be a SAML object that consistently includes attributes based on should be a SAML object that consistently includes attributes based on
the known arguments that were passed in. the known arguments that were passed in.
""" """
with mock.patch.object(keystone_idp, '_sign_assertion', with mock.patch.object(keystone_idp, '_sign_assertion',
return_value=self.signed_assertion): return_value=self.signed_assertion):
generator = keystone_idp.SAMLGenerator() generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT, response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
self.SUBJECT, self.SUBJECT,
self.SUBJECT_DOMAIN, self.SUBJECT_DOMAIN,
self.ROLES, self.PROJECT, self.ROLES, self.PROJECT,
self.PROJECT_DOMAIN) self.PROJECT_DOMAIN,
self.GROUPS)
assertion = response.assertion assertion = response.assertion
self.assertIsNotNone(assertion) self.assertIsNotNone(assertion)
self.assertIsInstance(assertion, saml.Assertion) self.assertIsInstance(assertion, saml.Assertion)
issuer = response.issuer issuer = response.issuer
self.assertEqual(self.RECIPIENT, response.destination) self.assertEqual(self.RECIPIENT, response.destination)
self.assertEqual(self.ISSUER, issuer.text) self.assertEqual(self.ISSUER, issuer.text)
user_attribute = assertion.attribute_statement[0].attribute[0] user_attribute = assertion.attribute_statement[0].attribute[0]
self.assertEqual(self.SUBJECT, user_attribute.attribute_value[0].text) self.assertEqual(self.SUBJECT, user_attribute.attribute_value[0].text)
skipping to change at line 3867 skipping to change at line 3907
project_attribute = assertion.attribute_statement[0].attribute[3] project_attribute = assertion.attribute_statement[0].attribute[3]
self.assertEqual(self.PROJECT, self.assertEqual(self.PROJECT,
project_attribute.attribute_value[0].text) project_attribute.attribute_value[0].text)
project_domain_attribute = ( project_domain_attribute = (
assertion.attribute_statement[0].attribute[4]) assertion.attribute_statement[0].attribute[4])
self.assertEqual(self.PROJECT_DOMAIN, self.assertEqual(self.PROJECT_DOMAIN,
project_domain_attribute.attribute_value[0].text) project_domain_attribute.attribute_value[0].text)
group_attribute = assertion.attribute_statement[0].attribute[5]
for attribute_value in group_attribute.attribute_value:
self.assertIn(attribute_value.text, self.GROUPS)
def test_comma_in_certfile_path(self): def test_comma_in_certfile_path(self):
self.config_fixture.config( self.config_fixture.config(
group='saml', group='saml',
certfile=CONF.saml.certfile + ',') certfile=CONF.saml.certfile + ',')
generator = keystone_idp.SAMLGenerator() generator = keystone_idp.SAMLGenerator()
self.assertRaises( self.assertRaises(
exception.UnexpectedError, exception.UnexpectedError,
generator.samlize_token, generator.samlize_token,
self.ISSUER, self.ISSUER,
self.RECIPIENT, self.RECIPIENT,
self.SUBJECT, self.SUBJECT,
self.SUBJECT_DOMAIN, self.SUBJECT_DOMAIN,
self.ROLES, self.ROLES,
self.PROJECT, self.PROJECT,
self.PROJECT_DOMAIN) self.PROJECT_DOMAIN,
self.GROUPS)
def test_comma_in_keyfile_path(self): def test_comma_in_keyfile_path(self):
self.config_fixture.config( self.config_fixture.config(
group='saml', group='saml',
keyfile=CONF.saml.keyfile + ',') keyfile=CONF.saml.keyfile + ',')
generator = keystone_idp.SAMLGenerator() generator = keystone_idp.SAMLGenerator()
self.assertRaises( self.assertRaises(
exception.UnexpectedError, exception.UnexpectedError,
generator.samlize_token, generator.samlize_token,
self.ISSUER, self.ISSUER,
self.RECIPIENT, self.RECIPIENT,
self.SUBJECT, self.SUBJECT,
self.SUBJECT_DOMAIN, self.SUBJECT_DOMAIN,
self.ROLES, self.ROLES,
self.PROJECT, self.PROJECT,
self.PROJECT_DOMAIN) self.PROJECT_DOMAIN,
self.GROUPS)
def test_verify_assertion_object(self): def test_verify_assertion_object(self):
"""Test that the Assertion object is built properly. """Test that the Assertion object is built properly.
The Assertion doesn't need to be signed in this test, so The Assertion doesn't need to be signed in this test, so
_sign_assertion method is patched and doesn't alter the assertion. _sign_assertion method is patched and doesn't alter the assertion.
""" """
with mock.patch.object(keystone_idp, '_sign_assertion', with mock.patch.object(keystone_idp, '_sign_assertion',
side_effect=lambda x: x): side_effect=lambda x: x):
generator = keystone_idp.SAMLGenerator() generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT, response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
self.SUBJECT, self.SUBJECT,
self.SUBJECT_DOMAIN, self.SUBJECT_DOMAIN,
self.ROLES, self.PROJECT, self.ROLES, self.PROJECT,
self.PROJECT_DOMAIN) self.PROJECT_DOMAIN,
self.GROUPS)
assertion = response.assertion assertion = response.assertion
self.assertEqual(self.ASSERTION_VERSION, assertion.version) self.assertEqual(self.ASSERTION_VERSION, assertion.version)
def test_valid_saml_xml(self): def test_valid_saml_xml(self):
"""Test the generated SAML object can become valid XML. """Test the generated SAML object can become valid XML.
Test the generator directly by passing known arguments, the result Test the generator directly by passing known arguments, the result
should be a SAML object that consistently includes attributes based on should be a SAML object that consistently includes attributes based on
the known arguments that were passed in. the known arguments that were passed in.
""" """
with mock.patch.object(keystone_idp, '_sign_assertion', with mock.patch.object(keystone_idp, '_sign_assertion',
return_value=self.signed_assertion): return_value=self.signed_assertion):
generator = keystone_idp.SAMLGenerator() generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT, response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
self.SUBJECT, self.SUBJECT,
self.SUBJECT_DOMAIN, self.SUBJECT_DOMAIN,
self.ROLES, self.PROJECT, self.ROLES, self.PROJECT,
self.PROJECT_DOMAIN) self.PROJECT_DOMAIN,
self.GROUPS)
saml_str = response.to_string() saml_str = response.to_string()
response = etree.fromstring(saml_str) response = etree.fromstring(saml_str)
issuer = response[0] issuer = response[0]
assertion = response[2] assertion = response[2]
self.assertEqual(self.RECIPIENT, response.get('Destination')) self.assertEqual(self.RECIPIENT, response.get('Destination'))
self.assertEqual(self.ISSUER, issuer.text) self.assertEqual(self.ISSUER, issuer.text)
user_attribute = assertion[4][0] user_attribute = assertion[4][0]
skipping to change at line 3958 skipping to change at line 4006
role_attribute = assertion[4][2] role_attribute = assertion[4][2]
for attribute_value in role_attribute: for attribute_value in role_attribute:
self.assertIn(attribute_value.text, self.ROLES) self.assertIn(attribute_value.text, self.ROLES)
project_attribute = assertion[4][3] project_attribute = assertion[4][3]
self.assertEqual(self.PROJECT, project_attribute[0].text) self.assertEqual(self.PROJECT, project_attribute[0].text)
project_domain_attribute = assertion[4][4] project_domain_attribute = assertion[4][4]
self.assertEqual(self.PROJECT_DOMAIN, project_domain_attribute[0].text) self.assertEqual(self.PROJECT_DOMAIN, project_domain_attribute[0].text)
group_attribute = assertion[4][5]
for attribute_value in group_attribute:
self.assertIn(attribute_value.text, self.GROUPS)
def test_assertion_using_explicit_namespace_prefixes(self): def test_assertion_using_explicit_namespace_prefixes(self):
def mocked_subprocess_check_output(*popenargs, **kwargs): def mocked_subprocess_check_output(*popenargs, **kwargs):
# the last option is the assertion file to be signed # the last option is the assertion file to be signed
if popenargs[0] != ['/usr/bin/which', CONF.saml.xmlsec1_binary]: if popenargs[0] != ['/usr/bin/which', CONF.saml.xmlsec1_binary]:
filename = popenargs[0][-1] filename = popenargs[0][-1]
with open(filename, 'r') as f: with open(filename, 'r') as f:
assertion_content = f.read() assertion_content = f.read()
# since we are not testing the signature itself, we can return # since we are not testing the signature itself, we can return
# the assertion as is without signing it # the assertion as is without signing it
return assertion_content return assertion_content
with mock.patch.object(subprocess, 'check_output', with mock.patch.object(subprocess, 'check_output',
side_effect=mocked_subprocess_check_output): side_effect=mocked_subprocess_check_output):
generator = keystone_idp.SAMLGenerator() generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT, response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
self.SUBJECT, self.SUBJECT,
self.SUBJECT_DOMAIN, self.SUBJECT_DOMAIN,
self.ROLES, self.PROJECT, self.ROLES, self.PROJECT,
self.PROJECT_DOMAIN) self.PROJECT_DOMAIN,
self.GROUPS)
assertion_xml = response.assertion.to_string() assertion_xml = response.assertion.to_string()
# The expected values in the assertions bellow need to be 'str' in # The expected values in the assertions bellow need to be 'str' in
# Python 2 and 'bytes' in Python 3 # Python 2 and 'bytes' in Python 3
# make sure we have the proper tag and prefix for the assertion # make sure we have the proper tag and prefix for the assertion
# namespace # namespace
self.assertIn(b'<saml:Assertion', assertion_xml) self.assertIn(b'<saml:Assertion', assertion_xml)
self.assertIn( self.assertIn(
('xmlns:saml="' + saml2.NAMESPACE + '"').encode('utf-8'), ('xmlns:saml="' + saml2.NAMESPACE + '"').encode('utf-8'),
assertion_xml) assertion_xml)
self.assertIn( self.assertIn(
skipping to change at line 4005 skipping to change at line 4058
the known arguments that were passed in. the known arguments that were passed in.
""" """
if not _is_xmlsec1_installed(): if not _is_xmlsec1_installed():
self.skipTest('xmlsec1 is not installed') self.skipTest('xmlsec1 is not installed')
generator = keystone_idp.SAMLGenerator() generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT, response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
self.SUBJECT, self.SUBJECT_DOMAIN, self.SUBJECT, self.SUBJECT_DOMAIN,
self.ROLES, self.PROJECT, self.ROLES, self.PROJECT,
self.PROJECT_DOMAIN) self.PROJECT_DOMAIN,
self.GROUPS)
signature = response.assertion.signature signature = response.assertion.signature
self.assertIsNotNone(signature) self.assertIsNotNone(signature)
self.assertIsInstance(signature, xmldsig.Signature) self.assertIsInstance(signature, xmldsig.Signature)
idp_public_key = sigver.read_cert_from_file(CONF.saml.certfile, 'pem') idp_public_key = sigver.read_cert_from_file(CONF.saml.certfile, 'pem')
cert_text = signature.key_info.x509_data[0].x509_certificate.text cert_text = signature.key_info.x509_data[0].x509_certificate.text
# NOTE(stevemar): Rather than one line of text, the certificate is # NOTE(stevemar): Rather than one line of text, the certificate is
# printed with newlines for readability, we remove these so we can # printed with newlines for readability, we remove these so we can
# match it with the key that we used. # match it with the key that we used.
skipping to change at line 4069 skipping to change at line 4123
The server should return a 403 Forbidden Action. The server should return a 403 Forbidden Action.
""" """
self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER) self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
token_id = self._fetch_domain_scoped_token() token_id = self._fetch_domain_scoped_token()
body = self._create_generate_saml_request(token_id, body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID) self.SERVICE_PROVDIER_ID)
with mock.patch.object(keystone_idp, '_sign_assertion', with mock.patch.object(keystone_idp, '_sign_assertion',
return_value=self.signed_assertion): return_value=self.signed_assertion):
self.post(self.SAML_GENERATION_ROUTE, body=body, self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_generate_saml_route(self): def test_generate_saml_route(self):
"""Test that the SAML generation endpoint produces XML. """Test that the SAML generation endpoint produces XML.
The SAML endpoint /v3/auth/OS-FEDERATION/saml2 should take as input, The SAML endpoint /v3/auth/OS-FEDERATION/saml2 should take as input,
a scoped token ID, and a Service Provider ID. a scoped token ID, and a Service Provider ID.
The controller should fetch details about the user from the token, The controller should fetch details about the user from the token,
and details about the service provider from its ID. and details about the service provider from its ID.
This should be enough information to invoke the SAML generator and This should be enough information to invoke the SAML generator and
provide a valid SAML (XML) document back. provide a valid SAML (XML) document back.
skipping to change at line 4091 skipping to change at line 4145
""" """
self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER) self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
token_id = self._fetch_valid_token() token_id = self._fetch_valid_token()
body = self._create_generate_saml_request(token_id, body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID) self.SERVICE_PROVDIER_ID)
with mock.patch.object(keystone_idp, '_sign_assertion', with mock.patch.object(keystone_idp, '_sign_assertion',
return_value=self.signed_assertion): return_value=self.signed_assertion):
http_response = self.post(self.SAML_GENERATION_ROUTE, body=body, http_response = self.post(self.SAML_GENERATION_ROUTE, body=body,
response_content_type='text/xml', response_content_type='text/xml',
expected_status=http_client.OK) expected_status=http.client.OK)
response = etree.fromstring(http_response.result) response = etree.fromstring(http_response.result)
issuer = response[0] issuer = response[0]
assertion = response[2] assertion = response[2]
self.assertEqual(self.RECIPIENT, response.get('Destination')) self.assertEqual(self.RECIPIENT, response.get('Destination'))
self.assertEqual(self.ISSUER, issuer.text) self.assertEqual(self.ISSUER, issuer.text)
# NOTE(stevemar): We should test this against expected values, # NOTE(stevemar): We should test this against expected values,
# but the self.xyz attribute names are uuids, and we mock out # but the self.xyz attribute names are uuids, and we mock out
skipping to change at line 4120 skipping to change at line 4174
role_attribute = assertion[4][2] role_attribute = assertion[4][2]
self.assertIsInstance(role_attribute[0].text, str) self.assertIsInstance(role_attribute[0].text, str)
project_attribute = assertion[4][3] project_attribute = assertion[4][3]
self.assertIsInstance(project_attribute[0].text, str) self.assertIsInstance(project_attribute[0].text, str)
project_domain_attribute = assertion[4][4] project_domain_attribute = assertion[4][4]
self.assertIsInstance(project_domain_attribute[0].text, str) self.assertIsInstance(project_domain_attribute[0].text, str)
group_attribute = assertion[4][5]
self.assertIsInstance(group_attribute[0].text, str)
def test_invalid_scope_body(self): def test_invalid_scope_body(self):
"""Test that missing the scope in request body raises an exception. """Test that missing the scope in request body raises an exception.
Raises exception.SchemaValidationError() - error 400 Bad Request Raises exception.SchemaValidationError() - error 400 Bad Request
""" """
token_id = uuid.uuid4().hex token_id = uuid.uuid4().hex
body = self._create_generate_saml_request(token_id, body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID) self.SERVICE_PROVDIER_ID)
del body['auth']['scope'] del body['auth']['scope']
self.post(self.SAML_GENERATION_ROUTE, body=body, self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_invalid_token_body(self): def test_invalid_token_body(self):
"""Test that missing the token in request body raises an exception. """Test that missing the token in request body raises an exception.
Raises exception.SchemaValidationError() - error 400 Bad Request Raises exception.SchemaValidationError() - error 400 Bad Request
""" """
token_id = uuid.uuid4().hex token_id = uuid.uuid4().hex
body = self._create_generate_saml_request(token_id, body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID) self.SERVICE_PROVDIER_ID)
del body['auth']['identity']['token'] del body['auth']['identity']['token']
self.post(self.SAML_GENERATION_ROUTE, body=body, self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_sp_not_found(self): def test_sp_not_found(self):
"""Test SAML generation with an invalid service provider ID. """Test SAML generation with an invalid service provider ID.
Raises exception.ServiceProviderNotFound() - error Not Found 404 Raises exception.ServiceProviderNotFound() - error Not Found 404
""" """
sp_id = uuid.uuid4().hex sp_id = uuid.uuid4().hex
token_id = self._fetch_valid_token() token_id = self._fetch_valid_token()
body = self._create_generate_saml_request(token_id, sp_id) body = self._create_generate_saml_request(token_id, sp_id)
self.post(self.SAML_GENERATION_ROUTE, body=body, self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_sp_disabled(self): def test_sp_disabled(self):
"""Try generating assertion for disabled Service Provider.""" """Try generating assertion for disabled Service Provider."""
# Disable Service Provider # Disable Service Provider
sp_ref = {'enabled': False} sp_ref = {'enabled': False}
PROVIDERS.federation_api.update_sp(self.SERVICE_PROVDIER_ID, sp_ref) PROVIDERS.federation_api.update_sp(self.SERVICE_PROVDIER_ID, sp_ref)
token_id = self._fetch_valid_token() token_id = self._fetch_valid_token()
body = self._create_generate_saml_request(token_id, body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID) self.SERVICE_PROVDIER_ID)
self.post(self.SAML_GENERATION_ROUTE, body=body, self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_token_not_found(self): def test_token_not_found(self):
"""Test that an invalid token in the request body raises an exception. """Test that an invalid token in the request body raises an exception.
Raises exception.TokenNotFound() - error Not Found 404 Raises exception.TokenNotFound() - error Not Found 404
""" """
token_id = uuid.uuid4().hex token_id = uuid.uuid4().hex
body = self._create_generate_saml_request(token_id, body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID) self.SERVICE_PROVDIER_ID)
self.post(self.SAML_GENERATION_ROUTE, body=body, self.post(self.SAML_GENERATION_ROUTE, body=body,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_generate_ecp_route(self): def test_generate_ecp_route(self):
"""Test that the ECP generation endpoint produces XML. """Test that the ECP generation endpoint produces XML.
The ECP endpoint /v3/auth/OS-FEDERATION/saml2/ecp should take the same The ECP endpoint /v3/auth/OS-FEDERATION/saml2/ecp should take the same
input as the SAML generation endpoint (scoped token ID + Service input as the SAML generation endpoint (scoped token ID + Service
Provider ID). Provider ID).
The controller should return a SAML assertion that is wrapped in a The controller should return a SAML assertion that is wrapped in a
SOAP envelope. SOAP envelope.
""" """
self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER) self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
token_id = self._fetch_valid_token() token_id = self._fetch_valid_token()
body = self._create_generate_saml_request(token_id, body = self._create_generate_saml_request(token_id,
self.SERVICE_PROVDIER_ID) self.SERVICE_PROVDIER_ID)
with mock.patch.object(keystone_idp, '_sign_assertion', with mock.patch.object(keystone_idp, '_sign_assertion',
return_value=self.signed_assertion): return_value=self.signed_assertion):
http_response = self.post(self.ECP_GENERATION_ROUTE, body=body, http_response = self.post(self.ECP_GENERATION_ROUTE, body=body,
response_content_type='text/xml', response_content_type='text/xml',
expected_status=http_client.OK) expected_status=http.client.OK)
env_response = etree.fromstring(http_response.result) env_response = etree.fromstring(http_response.result)
header = env_response[0] header = env_response[0]
# Verify the relay state starts with 'ss:mem' # Verify the relay state starts with 'ss:mem'
prefix = CONF.saml.relay_state_prefix prefix = CONF.saml.relay_state_prefix
self.assertThat(header[0].text, matchers.StartsWith(prefix)) self.assertThat(header[0].text, matchers.StartsWith(prefix))
# Verify that the content in the body matches the expected assertion # Verify that the content in the body matches the expected assertion
body = env_response[1] body = env_response[1]
skipping to change at line 4235 skipping to change at line 4292
role_attribute = assertion[4][2] role_attribute = assertion[4][2]
self.assertIsInstance(role_attribute[0].text, str) self.assertIsInstance(role_attribute[0].text, str)
project_attribute = assertion[4][3] project_attribute = assertion[4][3]
self.assertIsInstance(project_attribute[0].text, str) self.assertIsInstance(project_attribute[0].text, str)
project_domain_attribute = assertion[4][4] project_domain_attribute = assertion[4][4]
self.assertIsInstance(project_domain_attribute[0].text, str) self.assertIsInstance(project_domain_attribute[0].text, str)
group_attribute = assertion[4][5]
self.assertIsInstance(group_attribute[0].text, str)
@mock.patch('saml2.create_class_from_xml_string') @mock.patch('saml2.create_class_from_xml_string')
@mock.patch('oslo_utils.fileutils.write_to_tempfile') @mock.patch('oslo_utils.fileutils.write_to_tempfile')
@mock.patch.object(subprocess, 'check_output') @mock.patch.object(subprocess, 'check_output')
def test_sign_assertion(self, check_output_mock, def test_sign_assertion(self, check_output_mock,
write_to_tempfile_mock, create_class_mock): write_to_tempfile_mock, create_class_mock):
write_to_tempfile_mock.return_value = 'tmp_path' write_to_tempfile_mock.return_value = 'tmp_path'
check_output_mock.return_value = 'fakeoutput' check_output_mock.return_value = 'fakeoutput'
keystone_idp._sign_assertion(self.signed_assertion) keystone_idp._sign_assertion(self.signed_assertion)
skipping to change at line 4424 skipping to change at line 4484
def test_metadata_invalid_idp_entity_id(self): def test_metadata_invalid_idp_entity_id(self):
self.config_fixture.config( self.config_fixture.config(
group='saml', group='saml',
idp_entity_id=None) idp_entity_id=None)
self.assertRaises(exception.ValidationError, self.assertRaises(exception.ValidationError,
self.generator.generate_metadata) self.generator.generate_metadata)
def test_get_metadata_with_no_metadata_file_configured(self): def test_get_metadata_with_no_metadata_file_configured(self):
self.get(self.METADATA_URL, self.get(self.METADATA_URL,
expected_status=http_client.INTERNAL_SERVER_ERROR) expected_status=http.client.INTERNAL_SERVER_ERROR)
def test_get_head_metadata(self): def test_get_head_metadata(self):
self.config_fixture.config( self.config_fixture.config(
group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml') group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml')
self.head(self.METADATA_URL, expected_status=http_client.OK) self.head(self.METADATA_URL, expected_status=http.client.OK)
r = self.get(self.METADATA_URL, response_content_type='text/xml') r = self.get(self.METADATA_URL, response_content_type='text/xml')
self.assertEqual('text/xml', r.headers.get('Content-Type')) self.assertEqual('text/xml', r.headers.get('Content-Type'))
reference_file = _load_xml('idp_saml2_metadata.xml') reference_file = _load_xml('idp_saml2_metadata.xml')
# `reference_file` needs to be converted to bytes to be able to be # `reference_file` needs to be converted to bytes to be able to be
# compared to `r.result` in the case of Python 3. # compared to `r.result` in the case of Python 3.
reference_file = str.encode(reference_file) reference_file = str.encode(reference_file)
self.assertEqual(reference_file, r.result) self.assertEqual(reference_file, r.result)
skipping to change at line 4456 skipping to change at line 4516
SP_KEYS = ['auth_url', 'id', 'enabled', 'description', SP_KEYS = ['auth_url', 'id', 'enabled', 'description',
'relay_state_prefix', 'sp_url'] 'relay_state_prefix', 'sp_url']
def setUp(self): def setUp(self):
super(ServiceProviderTests, self).setUp() super(ServiceProviderTests, self).setUp()
# Add a Service Provider # Add a Service Provider
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.SP_REF = core.new_service_provider_ref() self.SP_REF = core.new_service_provider_ref()
self.SERVICE_PROVIDER = self.put( self.SERVICE_PROVIDER = self.put(
url, body={'service_provider': self.SP_REF}, url, body={'service_provider': self.SP_REF},
expected_status=http_client.CREATED).result expected_status=http.client.CREATED).result
def base_url(self, suffix=None): def base_url(self, suffix=None):
if suffix is not None: if suffix is not None:
return '/OS-FEDERATION/service_providers/' + str(suffix) return '/OS-FEDERATION/service_providers/' + str(suffix)
return '/OS-FEDERATION/service_providers' return '/OS-FEDERATION/service_providers'
def _create_default_sp(self, body=None): def _create_default_sp(self, body=None):
"""Create default Service Provider.""" """Create default Service Provider."""
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
if body is None: if body is None:
body = core.new_service_provider_ref() body = core.new_service_provider_ref()
resp = self.put(url, body={'service_provider': body}, resp = self.put(url, body={'service_provider': body},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
return resp return resp
def test_get_head_service_provider(self): def test_get_head_service_provider(self):
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
resp = self.get(url) resp = self.get(url)
self.assertValidEntity(resp.result['service_provider'], self.assertValidEntity(resp.result['service_provider'],
keys_to_check=self.SP_KEYS) keys_to_check=self.SP_KEYS)
resp = self.head(url, expected_status=http_client.OK) resp = self.head(url, expected_status=http.client.OK)
def test_get_service_provider_fail(self): def test_get_service_provider_fail(self):
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
self.get(url, expected_status=http_client.NOT_FOUND) self.get(url, expected_status=http.client.NOT_FOUND)
def test_create_service_provider(self): def test_create_service_provider(self):
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
sp = core.new_service_provider_ref() sp = core.new_service_provider_ref()
resp = self.put(url, body={'service_provider': sp}, resp = self.put(url, body={'service_provider': sp},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
self.assertValidEntity(resp.result['service_provider'], self.assertValidEntity(resp.result['service_provider'],
keys_to_check=self.SP_KEYS) keys_to_check=self.SP_KEYS)
@unit.skip_if_cache_disabled('federation') @unit.skip_if_cache_disabled('federation')
def test_create_service_provider_invalidates_cache(self): def test_create_service_provider_invalidates_cache(self):
# List all service providers and make sure we only have one in the # List all service providers and make sure we only have one in the
# list. This service provider is from testing setup. # list. This service provider is from testing setup.
resp = self.get(self.base_url(), expected_status=http_client.OK) resp = self.get(self.base_url(), expected_status=http.client.OK)
self.assertThat( self.assertThat(
resp.json_body['service_providers'], resp.json_body['service_providers'],
matchers.HasLength(1) matchers.HasLength(1)
) )
# Create a new service provider. # Create a new service provider.
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
sp = core.new_service_provider_ref() sp = core.new_service_provider_ref()
self.put(url, body={'service_provider': sp}, self.put(url, body={'service_provider': sp},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
# List all service providers again and make sure we have two in the # List all service providers again and make sure we have two in the
# returned list. # returned list.
resp = self.get(self.base_url(), expected_status=http_client.OK) resp = self.get(self.base_url(), expected_status=http.client.OK)
self.assertThat( self.assertThat(
resp.json_body['service_providers'], resp.json_body['service_providers'],
matchers.HasLength(2) matchers.HasLength(2)
) )
@unit.skip_if_cache_disabled('federation') @unit.skip_if_cache_disabled('federation')
def test_delete_service_provider_invalidates_cache(self): def test_delete_service_provider_invalidates_cache(self):
# List all service providers and make sure we only have one in the # List all service providers and make sure we only have one in the
# list. This service provider is from testing setup. # list. This service provider is from testing setup.
resp = self.get(self.base_url(), expected_status=http_client.OK) resp = self.get(self.base_url(), expected_status=http.client.OK)
self.assertThat( self.assertThat(
resp.json_body['service_providers'], resp.json_body['service_providers'],
matchers.HasLength(1) matchers.HasLength(1)
) )
# Create a new service provider. # Create a new service provider.
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
sp = core.new_service_provider_ref() sp = core.new_service_provider_ref()
self.put(url, body={'service_provider': sp}, self.put(url, body={'service_provider': sp},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
# List all service providers again and make sure we have two in the # List all service providers again and make sure we have two in the
# returned list. # returned list.
resp = self.get(self.base_url(), expected_status=http_client.OK) resp = self.get(self.base_url(), expected_status=http.client.OK)
self.assertThat( self.assertThat(
resp.json_body['service_providers'], resp.json_body['service_providers'],
matchers.HasLength(2) matchers.HasLength(2)
) )
# Delete the service provider we created, which should invalidate the # Delete the service provider we created, which should invalidate the
# service provider cache. Get the list of service providers again and # service provider cache. Get the list of service providers again and
# if the cache invalidated properly then we should only have one # if the cache invalidated properly then we should only have one
# service provider in the list. # service provider in the list.
self.delete(url, expected_status=http_client.NO_CONTENT) self.delete(url, expected_status=http.client.NO_CONTENT)
resp = self.get(self.base_url(), expected_status=http_client.OK) resp = self.get(self.base_url(), expected_status=http.client.OK)
self.assertThat( self.assertThat(
resp.json_body['service_providers'], resp.json_body['service_providers'],
matchers.HasLength(1) matchers.HasLength(1)
) )
@unit.skip_if_cache_disabled('federation') @unit.skip_if_cache_disabled('federation')
def test_update_service_provider_invalidates_cache(self): def test_update_service_provider_invalidates_cache(self):
# List all service providers and make sure we only have one in the # List all service providers and make sure we only have one in the
# list. This service provider is from testing setup. # list. This service provider is from testing setup.
resp = self.get(self.base_url(), expected_status=http_client.OK) resp = self.get(self.base_url(), expected_status=http.client.OK)
self.assertThat( self.assertThat(
resp.json_body['service_providers'], resp.json_body['service_providers'],
matchers.HasLength(1) matchers.HasLength(1)
) )
# Create a new service provider. # Create a new service provider.
service_provider_id = uuid.uuid4().hex service_provider_id = uuid.uuid4().hex
url = self.base_url(suffix=service_provider_id) url = self.base_url(suffix=service_provider_id)
sp = core.new_service_provider_ref() sp = core.new_service_provider_ref()
self.put(url, body={'service_provider': sp}, self.put(url, body={'service_provider': sp},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
# List all service providers again and make sure we have two in the # List all service providers again and make sure we have two in the
# returned list. # returned list.
resp = self.get(self.base_url(), expected_status=http_client.OK) resp = self.get(self.base_url(), expected_status=http.client.OK)
self.assertThat( self.assertThat(
resp.json_body['service_providers'], resp.json_body['service_providers'],
matchers.HasLength(2) matchers.HasLength(2)
) )
# Update the service provider we created, which should invalidate the # Update the service provider we created, which should invalidate the
# service provider cache. Get the list of service providers again and # service provider cache. Get the list of service providers again and
# if the cache invalidated properly then we see the value we updated. # if the cache invalidated properly then we see the value we updated.
updated_description = uuid.uuid4().hex updated_description = uuid.uuid4().hex
body = {'service_provider': {'description': updated_description}} body = {'service_provider': {'description': updated_description}}
self.patch(url, body=body, expected_status=http_client.OK) self.patch(url, body=body, expected_status=http.client.OK)
resp = self.get(self.base_url(), expected_status=http_client.OK) resp = self.get(self.base_url(), expected_status=http.client.OK)
self.assertThat( self.assertThat(
resp.json_body['service_providers'], resp.json_body['service_providers'],
matchers.HasLength(2) matchers.HasLength(2)
) )
for sp in resp.json_body['service_providers']: for sp in resp.json_body['service_providers']:
if sp['id'] == service_provider_id: if sp['id'] == service_provider_id:
self.assertEqual(sp['description'], updated_description) self.assertEqual(sp['description'], updated_description)
def test_create_sp_relay_state_default(self): def test_create_sp_relay_state_default(self):
"""Create an SP without relay state, should default to `ss:mem`.""" """Create an SP without relay state, should default to `ss:mem`."""
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
sp = core.new_service_provider_ref() sp = core.new_service_provider_ref()
del sp['relay_state_prefix'] del sp['relay_state_prefix']
resp = self.put(url, body={'service_provider': sp}, resp = self.put(url, body={'service_provider': sp},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
sp_result = resp.result['service_provider'] sp_result = resp.result['service_provider']
self.assertEqual(CONF.saml.relay_state_prefix, self.assertEqual(CONF.saml.relay_state_prefix,
sp_result['relay_state_prefix']) sp_result['relay_state_prefix'])
def test_create_sp_relay_state_non_default(self): def test_create_sp_relay_state_non_default(self):
"""Create an SP with custom relay state.""" """Create an SP with custom relay state."""
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
sp = core.new_service_provider_ref() sp = core.new_service_provider_ref()
non_default_prefix = uuid.uuid4().hex non_default_prefix = uuid.uuid4().hex
sp['relay_state_prefix'] = non_default_prefix sp['relay_state_prefix'] = non_default_prefix
resp = self.put(url, body={'service_provider': sp}, resp = self.put(url, body={'service_provider': sp},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
sp_result = resp.result['service_provider'] sp_result = resp.result['service_provider']
self.assertEqual(non_default_prefix, self.assertEqual(non_default_prefix,
sp_result['relay_state_prefix']) sp_result['relay_state_prefix'])
def test_create_service_provider_fail(self): def test_create_service_provider_fail(self):
"""Try adding SP object with unallowed attribute.""" """Try adding SP object with unallowed attribute."""
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
sp = core.new_service_provider_ref() sp = core.new_service_provider_ref()
sp[uuid.uuid4().hex] = uuid.uuid4().hex sp[uuid.uuid4().hex] = uuid.uuid4().hex
self.put(url, body={'service_provider': sp}, self.put(url, body={'service_provider': sp},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_list_head_service_providers(self): def test_list_head_service_providers(self):
"""Test listing of service provider objects. """Test listing of service provider objects.
Add two new service providers. List all available service providers. Add two new service providers. List all available service providers.
Expect to get list of three service providers (one created by setUp()) Expect to get list of three service providers (one created by setUp())
Test if attributes match. Test if attributes match.
""" """
ref_service_providers = { ref_service_providers = {
uuid.uuid4().hex: core.new_service_provider_ref(), uuid.uuid4().hex: core.new_service_provider_ref(),
uuid.uuid4().hex: core.new_service_provider_ref(), uuid.uuid4().hex: core.new_service_provider_ref(),
} }
for id, sp in ref_service_providers.items(): for id, sp in ref_service_providers.items():
url = self.base_url(suffix=id) url = self.base_url(suffix=id)
self.put(url, body={'service_provider': sp}, self.put(url, body={'service_provider': sp},
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
# Insert ids into service provider object, we will compare it with # Insert ids into service provider object, we will compare it with
# responses from server and those include 'id' attribute. # responses from server and those include 'id' attribute.
ref_service_providers[self.SERVICE_PROVIDER_ID] = self.SP_REF ref_service_providers[self.SERVICE_PROVIDER_ID] = self.SP_REF
for id, sp in ref_service_providers.items(): for id, sp in ref_service_providers.items():
sp['id'] = id sp['id'] = id
url = self.base_url() url = self.base_url()
resp = self.get(url) resp = self.get(url)
service_providers = resp.result service_providers = resp.result
for service_provider in service_providers['service_providers']: for service_provider in service_providers['service_providers']:
id = service_provider['id'] id = service_provider['id']
self.assertValidEntity( self.assertValidEntity(
service_provider, ref=ref_service_providers[id], service_provider, ref=ref_service_providers[id],
keys_to_check=self.SP_KEYS) keys_to_check=self.SP_KEYS)
self.head(url, expected_status=http_client.OK) self.head(url, expected_status=http.client.OK)
def test_update_service_provider(self): def test_update_service_provider(self):
"""Update existing service provider. """Update existing service provider.
Update default existing service provider and make sure it has been Update default existing service provider and make sure it has been
properly changed. properly changed.
""" """
new_sp_ref = core.new_service_provider_ref() new_sp_ref = core.new_service_provider_ref()
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
skipping to change at line 4688 skipping to change at line 4748
def test_update_service_provider_immutable_parameters(self): def test_update_service_provider_immutable_parameters(self):
"""Update immutable attributes in service provider. """Update immutable attributes in service provider.
In this particular case the test will try to change ``id`` attribute. In this particular case the test will try to change ``id`` attribute.
The server should return an HTTP 403 Forbidden error code. The server should return an HTTP 403 Forbidden error code.
""" """
new_sp_ref = {'id': uuid.uuid4().hex} new_sp_ref = {'id': uuid.uuid4().hex}
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.patch(url, body={'service_provider': new_sp_ref}, self.patch(url, body={'service_provider': new_sp_ref},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_update_service_provider_unknown_parameter(self): def test_update_service_provider_unknown_parameter(self):
new_sp_ref = core.new_service_provider_ref() new_sp_ref = core.new_service_provider_ref()
new_sp_ref[uuid.uuid4().hex] = uuid.uuid4().hex new_sp_ref[uuid.uuid4().hex] = uuid.uuid4().hex
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.patch(url, body={'service_provider': new_sp_ref}, self.patch(url, body={'service_provider': new_sp_ref},
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_update_service_provider_returns_not_found(self): def test_update_service_provider_returns_not_found(self):
new_sp_ref = core.new_service_provider_ref() new_sp_ref = core.new_service_provider_ref()
new_sp_ref['description'] = uuid.uuid4().hex new_sp_ref['description'] = uuid.uuid4().hex
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
self.patch(url, body={'service_provider': new_sp_ref}, self.patch(url, body={'service_provider': new_sp_ref},
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_update_sp_relay_state(self): def test_update_sp_relay_state(self):
"""Update an SP with custom relay state.""" """Update an SP with custom relay state."""
new_sp_ref = core.new_service_provider_ref() new_sp_ref = core.new_service_provider_ref()
non_default_prefix = uuid.uuid4().hex non_default_prefix = uuid.uuid4().hex
new_sp_ref['relay_state_prefix'] = non_default_prefix new_sp_ref['relay_state_prefix'] = non_default_prefix
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
resp = self.patch(url, body={'service_provider': new_sp_ref}) resp = self.patch(url, body={'service_provider': new_sp_ref})
sp_result = resp.result['service_provider'] sp_result = resp.result['service_provider']
self.assertEqual(non_default_prefix, self.assertEqual(non_default_prefix,
sp_result['relay_state_prefix']) sp_result['relay_state_prefix'])
def test_delete_service_provider(self): def test_delete_service_provider(self):
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.delete(url) self.delete(url)
def test_delete_service_provider_returns_not_found(self): def test_delete_service_provider_returns_not_found(self):
url = self.base_url(suffix=uuid.uuid4().hex) url = self.base_url(suffix=uuid.uuid4().hex)
self.delete(url, expected_status=http_client.NOT_FOUND) self.delete(url, expected_status=http.client.NOT_FOUND)
def test_filter_list_sp_by_id(self): def test_filter_list_sp_by_id(self):
def get_id(resp): def get_id(resp):
sp = resp.result.get('service_provider') sp = resp.result.get('service_provider')
return sp.get('id') return sp.get('id')
sp1_id = get_id(self._create_default_sp()) sp1_id = get_id(self._create_default_sp())
sp2_id = get_id(self._create_default_sp()) sp2_id = get_id(self._create_default_sp())
# list the SP, should get SPs. # list the SP, should get SPs.
 End of changes. 140 change blocks. 
136 lines changed or deleted 196 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)