"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "keystone/tests/unit/test_backend_sql.py" between
keystone-16.0.1.tar.gz and keystone-17.0.0.tar.gz

About: OpenStack Keystone (Core Service: Identity) provides an authentication and authorization service for other OpenStack services. Provides a catalog of endpoints for all OpenStack services.
The "Ussuri" series (latest release).

test_backend_sql.py  (keystone-16.0.1):test_backend_sql.py  (keystone-17.0.0)
skipping to change at line 16 skipping to change at line 16
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import datetime import datetime
from unittest import mock
import uuid import uuid
import mock import freezegun
from oslo_db import exception as db_exception from oslo_db import exception as db_exception
from oslo_db import options from oslo_db import options
from six.moves import range
import sqlalchemy import sqlalchemy
from sqlalchemy import exc from sqlalchemy import exc
from testtools import matchers from testtools import matchers
from keystone.common import driver_hints from keystone.common import driver_hints
from keystone.common import provider_api from keystone.common import provider_api
from keystone.common import sql from keystone.common import sql
from keystone.common.sql import core from keystone.common.sql import core
import keystone.conf import keystone.conf
from keystone.credential.providers import fernet as credential_provider from keystone.credential.providers import fernet as credential_provider
skipping to change at line 670 skipping to change at line 670
test_groups[x]['id']) test_groups[x]['id'])
group_refs = PROVIDERS.identity_api.list_groups_for_user( group_refs = PROVIDERS.identity_api.list_groups_for_user(
positive_user['id']) positive_user['id'])
self.assertEqual(after_count, len(group_refs)) self.assertEqual(after_count, len(group_refs))
# Make sure the group count for the unrelated user # Make sure the group count for the unrelated user
# did not change # did not change
group_refs = PROVIDERS.identity_api.list_groups_for_user( group_refs = PROVIDERS.identity_api.list_groups_for_user(
negative_user['id']) negative_user['id'])
self.assertEqual(0, len(group_refs)) self.assertEqual(0, len(group_refs))
def test_add_user_to_group_expiring_mapped(self):
self._build_fed_resource()
domain = self._get_domain_fixture()
self.config_fixture.config(group='federation',
default_authorization_ttl=5)
time = datetime.datetime.utcnow()
tick = datetime.timedelta(minutes=5)
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = PROVIDERS.identity_api.create_group(new_group)
fed_dict = unit.new_federated_user_ref()
fed_dict['idp_id'] = 'myidp'
fed_dict['protocol_id'] = 'mapped'
with freezegun.freeze_time(time - tick) as frozen_time:
user = PROVIDERS.identity_api.shadow_federated_user(
**fed_dict, group_ids=[new_group['id']])
PROVIDERS.identity_api.check_user_in_group(user['id'],
new_group['id'])
# Expiration
frozen_time.tick(tick)
self.assertRaises(exception.NotFound,
PROVIDERS.identity_api.check_user_in_group,
user['id'],
new_group['id'])
# Renewal
PROVIDERS.identity_api.shadow_federated_user(
**fed_dict, group_ids=[new_group['id']])
PROVIDERS.identity_api.check_user_in_group(user['id'],
new_group['id'])
def test_add_user_to_group_expiring(self):
self._build_fed_resource()
domain = self._get_domain_fixture()
time = datetime.datetime.utcnow()
tick = datetime.timedelta(minutes=5)
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = PROVIDERS.identity_api.create_group(new_group)
fed_dict = unit.new_federated_user_ref()
fed_dict['idp_id'] = 'myidp'
fed_dict['protocol_id'] = 'mapped'
new_user = PROVIDERS.shadow_users_api.create_federated_user(
domain['id'], fed_dict
)
with freezegun.freeze_time(time - tick) as frozen_time:
PROVIDERS.shadow_users_api.add_user_to_group_expires(
new_user['id'], new_group['id'])
self.config_fixture.config(group='federation',
default_authorization_ttl=0)
self.assertRaises(exception.NotFound,
PROVIDERS.identity_api.check_user_in_group,
new_user['id'],
new_group['id'])
self.config_fixture.config(group='federation',
default_authorization_ttl=5)
PROVIDERS.identity_api.check_user_in_group(new_user['id'],
new_group['id'])
# Expiration
frozen_time.tick(tick)
self.assertRaises(exception.NotFound,
PROVIDERS.identity_api.check_user_in_group,
new_user['id'],
new_group['id'])
# Renewal
PROVIDERS.shadow_users_api.add_user_to_group_expires(
new_user['id'], new_group['id'])
PROVIDERS.identity_api.check_user_in_group(new_user['id'],
new_group['id'])
def test_add_user_to_group_expiring_list(self):
self._build_fed_resource()
domain = self._get_domain_fixture()
self.config_fixture.config(group='federation',
default_authorization_ttl=5)
time = datetime.datetime.utcnow()
tick = datetime.timedelta(minutes=5)
new_group = unit.new_group_ref(domain_id=domain['id'])
new_group = PROVIDERS.identity_api.create_group(new_group)
exp_new_group = unit.new_group_ref(domain_id=domain['id'])
exp_new_group = PROVIDERS.identity_api.create_group(exp_new_group)
fed_dict = unit.new_federated_user_ref()
fed_dict['idp_id'] = 'myidp'
fed_dict['protocol_id'] = 'mapped'
new_user = PROVIDERS.shadow_users_api.create_federated_user(
domain['id'], fed_dict
)
PROVIDERS.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
PROVIDERS.identity_api.check_user_in_group(new_user['id'],
new_group['id'])
with freezegun.freeze_time(time - tick) as frozen_time:
PROVIDERS.shadow_users_api.add_user_to_group_expires(
new_user['id'], exp_new_group['id'])
PROVIDERS.identity_api.check_user_in_group(new_user['id'],
new_group['id'])
groups = PROVIDERS.identity_api.list_groups_for_user(
new_user['id'])
self.assertEqual(len(groups), 2)
for group in groups:
if group.get('membership_expires_at'):
self.assertEqual(group['membership_expires_at'], time)
frozen_time.tick(tick)
groups = PROVIDERS.identity_api.list_groups_for_user(
new_user['id'])
self.assertEqual(len(groups), 1)
def test_storing_null_domain_id_in_project_ref(self): def test_storing_null_domain_id_in_project_ref(self):
"""Test the special storage of domain_id=None in sql resource driver. """Test the special storage of domain_id=None in sql resource driver.
The resource driver uses a special value in place of None for domain_id The resource driver uses a special value in place of None for domain_id
in the project record. This shouldn't escape the driver. Hence we test in the project record. This shouldn't escape the driver. Hence we test
the interface to ensure that you can store a domain_id of None, and the interface to ensure that you can store a domain_id of None, and
that any special value used inside the driver does not escape through that any special value used inside the driver does not escape through
the interface. the interface.
""" """
 End of changes. 4 change blocks. 
2 lines changed or deleted 125 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)