"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "keystone/tests/unit/test_limits.py" between
keystone-16.0.1.tar.gz and keystone-17.0.0.tar.gz

About: OpenStack Keystone (Core Service: Identity) provides an authentication and authorization service for other OpenStack services. Provides a catalog of endpoints for all OpenStack services.
The "Ussuri" series (latest release).

test_limits.py  (keystone-16.0.1):test_limits.py  (keystone-17.0.0)
skipping to change at line 15 skipping to change at line 15
# a copy of the License at # a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from six.moves import http_client import http.client
import uuid import uuid
from keystone.common import provider_api from keystone.common import provider_api
from keystone.common.validation import validators from keystone.common.validation import validators
import keystone.conf import keystone.conf
from keystone.tests import unit from keystone.tests import unit
from keystone.tests.unit import test_v3 from keystone.tests.unit import test_v3
CONF = keystone.conf.CONF CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs PROVIDERS = provider_api.ProviderAPIs
skipping to change at line 51 skipping to change at line 51
}, },
}, },
'required': ['model'], 'required': ['model'],
'additionalProperties': False, 'additionalProperties': False,
} }
validator = validators.SchemaValidator(schema) validator = validators.SchemaValidator(schema)
response = self.get('/limits/model') response = self.get('/limits/model')
validator.validate(response.json_body) validator.validate(response.json_body)
def test_head_limit_model(self): def test_head_limit_model(self):
self.head('/limits/model', expected_status=http_client.OK) self.head('/limits/model', expected_status=http.client.OK)
def test_get_limit_model_returns_default_model(self): def test_get_limit_model_returns_default_model(self):
response = self.get('/limits/model') response = self.get('/limits/model')
model = response.result model = response.result
expected = { expected = {
'model': { 'model': {
'name': 'flat', 'name': 'flat',
'description': ( 'description': (
'Limit enforcement and validation does not take project ' 'Limit enforcement and validation does not take project '
'hierarchy into consideration.' 'hierarchy into consideration.'
) )
} }
} }
self.assertDictEqual(expected, model) self.assertDictEqual(expected, model)
def test_get_limit_model_without_token_fails(self): def test_get_limit_model_without_token_fails(self):
self.get( self.get(
'/limits/model', noauth=True, '/limits/model', noauth=True,
expected_status=http_client.UNAUTHORIZED expected_status=http.client.UNAUTHORIZED
) )
def test_head_limit_model_without_token_fails(self): def test_head_limit_model_without_token_fails(self):
self.head( self.head(
'/limits/model', noauth=True, '/limits/model', noauth=True,
expected_status=http_client.UNAUTHORIZED expected_status=http.client.UNAUTHORIZED
) )
class RegisteredLimitsTestCase(test_v3.RestfulTestCase): class RegisteredLimitsTestCase(test_v3.RestfulTestCase):
"""Test registered_limits CRUD.""" """Test registered_limits CRUD."""
def setUp(self): def setUp(self):
super(RegisteredLimitsTestCase, self).setUp() super(RegisteredLimitsTestCase, self).setUp()
# Most of these tests require system-scoped tokens. Let's have one on # Most of these tests require system-scoped tokens. Let's have one on
# hand so that we can use it in tests when we need it. # hand so that we can use it in tests when we need it.
skipping to change at line 114 skipping to change at line 114
self.service2 = response.json_body['service'] self.service2 = response.json_body['service']
self.service_id2 = self.service2['id'] self.service_id2 = self.service2['id']
def test_create_registered_limit(self): def test_create_registered_limit(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id) region_id=self.region_id)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'default_limit', 'description']: 'default_limit', 'description']:
self.assertEqual(registered_limits[0][key], ref[key]) self.assertEqual(registered_limits[0][key], ref[key])
def test_create_registered_limit_without_region(self): def test_create_registered_limit_without_region(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id) ref = unit.new_registered_limit_ref(service_id=self.service_id)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
for key in ['service_id', 'resource_name', 'default_limit']: for key in ['service_id', 'resource_name', 'default_limit']:
self.assertEqual(registered_limits[0][key], ref[key]) self.assertEqual(registered_limits[0][key], ref[key])
self.assertIsNone(registered_limits[0].get('region_id')) self.assertIsNone(registered_limits[0].get('region_id'))
def test_create_registered_without_description(self): def test_create_registered_without_description(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id) region_id=self.region_id)
ref.pop('description') ref.pop('description')
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'default_limit']: 'default_limit']:
self.assertEqual(registered_limits[0][key], ref[key]) self.assertEqual(registered_limits[0][key], ref[key])
self.assertIsNone(registered_limits[0]['description']) self.assertIsNone(registered_limits[0]['description'])
def test_create_multi_registered_limit(self): def test_create_multi_registered_limit(self):
ref1 = unit.new_registered_limit_ref(service_id=self.service_id, ref1 = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
ref2 = unit.new_registered_limit_ref(service_id=self.service_id, ref2 = unit.new_registered_limit_ref(service_id=self.service_id,
resource_name='snapshot') resource_name='snapshot')
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref1, ref2]}, body={'registered_limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
for key in ['service_id', 'resource_name', 'default_limit']: for key in ['service_id', 'resource_name', 'default_limit']:
self.assertEqual(registered_limits[0][key], ref1[key]) self.assertEqual(registered_limits[0][key], ref1[key])
self.assertEqual(registered_limits[1][key], ref2[key]) self.assertEqual(registered_limits[1][key], ref2[key])
self.assertEqual(registered_limits[0]['region_id'], ref1['region_id']) self.assertEqual(registered_limits[0]['region_id'], ref1['region_id'])
self.assertIsNone(registered_limits[1].get('region_id')) self.assertIsNone(registered_limits[1].get('region_id'))
def test_create_registered_limit_return_count(self): def test_create_registered_limit_return_count(self):
ref1 = unit.new_registered_limit_ref(service_id=self.service_id, ref1 = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id) region_id=self.region_id)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref1]}, body={'registered_limits': [ref1]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
self.assertEqual(1, len(registered_limits)) self.assertEqual(1, len(registered_limits))
ref2 = unit.new_registered_limit_ref(service_id=self.service_id2, ref2 = unit.new_registered_limit_ref(service_id=self.service_id2,
region_id=self.region_id2) region_id=self.region_id2)
ref3 = unit.new_registered_limit_ref(service_id=self.service_id2) ref3 = unit.new_registered_limit_ref(service_id=self.service_id2)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref2, ref3]}, body={'registered_limits': [ref2, ref3]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
self.assertEqual(2, len(registered_limits)) self.assertEqual(2, len(registered_limits))
def test_create_registered_limit_with_invalid_input(self): def test_create_registered_limit_with_invalid_input(self):
ref1 = unit.new_registered_limit_ref() ref1 = unit.new_registered_limit_ref()
ref2 = unit.new_registered_limit_ref(default_limit='not_int') ref2 = unit.new_registered_limit_ref(default_limit='not_int')
ref3 = unit.new_registered_limit_ref(resource_name=123) ref3 = unit.new_registered_limit_ref(resource_name=123)
ref4 = unit.new_registered_limit_ref(region_id='fake_region') ref4 = unit.new_registered_limit_ref(region_id='fake_region')
for input_limit in [ref1, ref2, ref3, ref4]: for input_limit in [ref1, ref2, ref3, ref4]:
self.post( self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [input_limit]}, body={'registered_limits': [input_limit]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_create_registered_limit_duplicate(self): def test_create_registered_limit_duplicate(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id) region_id=self.region_id)
self.post( self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
self.post( self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CONFLICT) expected_status=http.client.CONFLICT)
def test_update_registered_limit(self): def test_update_registered_limit(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
default_limit=10) default_limit=10)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_ref = { update_ref = {
'service_id': self.service_id2, 'service_id': self.service_id2,
'region_id': self.region_id2, 'region_id': self.region_id2,
'resource_name': 'snapshot', 'resource_name': 'snapshot',
'default_limit': 5, 'default_limit': 5,
'description': 'test description' 'description': 'test description'
} }
r = self.patch( r = self.patch(
'/registered_limits/%s' % r.result['registered_limits'][0]['id'], '/registered_limits/%s' % r.result['registered_limits'][0]['id'],
body={'registered_limit': update_ref}, body={'registered_limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
new_registered_limits = r.result['registered_limit'] new_registered_limits = r.result['registered_limit']
self.assertEqual(new_registered_limits['service_id'], self.service_id2) self.assertEqual(new_registered_limits['service_id'], self.service_id2)
self.assertEqual(new_registered_limits['region_id'], self.region_id2) self.assertEqual(new_registered_limits['region_id'], self.region_id2)
self.assertEqual(new_registered_limits['resource_name'], 'snapshot') self.assertEqual(new_registered_limits['resource_name'], 'snapshot')
self.assertEqual(new_registered_limits['default_limit'], 5) self.assertEqual(new_registered_limits['default_limit'], 5)
self.assertEqual(new_registered_limits['description'], self.assertEqual(new_registered_limits['description'],
'test description') 'test description')
def test_update_registered_limit_region_failed(self): def test_update_registered_limit_region_failed(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
resource_name='volume', resource_name='volume',
default_limit=10, default_limit=10,
description='test description') description='test description')
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_ref = { update_ref = {
'region_id': self.region_id, 'region_id': self.region_id,
} }
registered_limit_id = r.result['registered_limits'][0]['id'] registered_limit_id = r.result['registered_limits'][0]['id']
r = self.patch( r = self.patch(
'/registered_limits/%s' % registered_limit_id, '/registered_limits/%s' % registered_limit_id,
body={'registered_limit': update_ref}, body={'registered_limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
new_registered_limits = r.result['registered_limit'] new_registered_limits = r.result['registered_limit']
self.assertEqual(self.region_id, new_registered_limits['region_id']) self.assertEqual(self.region_id, new_registered_limits['region_id'])
update_ref['region_id'] = '' update_ref['region_id'] = ''
r = self.patch( r = self.patch(
'/registered_limits/%s' % registered_limit_id, '/registered_limits/%s' % registered_limit_id,
body={'registered_limit': update_ref}, body={'registered_limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_update_registered_limit_description(self): def test_update_registered_limit_description(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
default_limit=10) default_limit=10)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_ref = { update_ref = {
'description': 'test description' 'description': 'test description'
} }
registered_limit_id = r.result['registered_limits'][0]['id'] registered_limit_id = r.result['registered_limits'][0]['id']
r = self.patch( r = self.patch(
'/registered_limits/%s' % registered_limit_id, '/registered_limits/%s' % registered_limit_id,
body={'registered_limit': update_ref}, body={'registered_limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
new_registered_limits = r.result['registered_limit'] new_registered_limits = r.result['registered_limit']
self.assertEqual(new_registered_limits['description'], self.assertEqual(new_registered_limits['description'],
'test description') 'test description')
update_ref['description'] = '' update_ref['description'] = ''
r = self.patch( r = self.patch(
'/registered_limits/%s' % registered_limit_id, '/registered_limits/%s' % registered_limit_id,
body={'registered_limit': update_ref}, body={'registered_limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
new_registered_limits = r.result['registered_limit'] new_registered_limits = r.result['registered_limit']
self.assertEqual(new_registered_limits['description'], '') self.assertEqual(new_registered_limits['description'], '')
def test_update_registered_limit_region_id_to_none(self): def test_update_registered_limit_region_id_to_none(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
default_limit=10) default_limit=10)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_ref = { update_ref = {
'region_id': None 'region_id': None
} }
registered_limit_id = r.result['registered_limits'][0]['id'] registered_limit_id = r.result['registered_limits'][0]['id']
r = self.patch( r = self.patch(
'/registered_limits/%s' % registered_limit_id, '/registered_limits/%s' % registered_limit_id,
body={'registered_limit': update_ref}, body={'registered_limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
self.assertIsNone(r.result['registered_limit']['region_id']) self.assertIsNone(r.result['registered_limit']['region_id'])
def test_update_registered_limit_region_id_to_none_conflict(self): def test_update_registered_limit_region_id_to_none_conflict(self):
ref1 = unit.new_registered_limit_ref(service_id=self.service_id, ref1 = unit.new_registered_limit_ref(service_id=self.service_id,
resource_name='volume', resource_name='volume',
default_limit=10) default_limit=10)
ref2 = unit.new_registered_limit_ref(service_id=self.service_id, ref2 = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
default_limit=10) default_limit=10)
self.post( self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref1]}, body={'registered_limits': [ref1]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref2]}, body={'registered_limits': [ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_ref = { update_ref = {
'region_id': None 'region_id': None
} }
registered_limit_id = r.result['registered_limits'][0]['id'] registered_limit_id = r.result['registered_limits'][0]['id']
# There is a registered limit with "service_id=self.service_id, # There is a registered limit with "service_id=self.service_id,
# region_id=None" already. So update ref2's region_id to None will # region_id=None" already. So update ref2's region_id to None will
# raise 409 Conflict Error. # raise 409 Conflict Error.
self.patch( self.patch(
'/registered_limits/%s' % registered_limit_id, '/registered_limits/%s' % registered_limit_id,
body={'registered_limit': update_ref}, body={'registered_limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CONFLICT) expected_status=http.client.CONFLICT)
def test_update_registered_limit_not_found(self): def test_update_registered_limit_not_found(self):
update_ref = { update_ref = {
'service_id': self.service_id, 'service_id': self.service_id,
'region_id': self.region_id, 'region_id': self.region_id,
'resource_name': 'snapshot', 'resource_name': 'snapshot',
'default_limit': 5 'default_limit': 5
} }
self.patch( self.patch(
'/registered_limits/%s' % uuid.uuid4().hex, '/registered_limits/%s' % uuid.uuid4().hex,
body={'registered_limit': update_ref}, body={'registered_limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_update_registered_limit_with_invalid_input(self): def test_update_registered_limit_with_invalid_input(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
default_limit=10) default_limit=10)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
reg_id = r.result['registered_limits'][0]['id'] reg_id = r.result['registered_limits'][0]['id']
update_ref1 = unit.new_registered_limit_ref(service_id='fake_id') update_ref1 = unit.new_registered_limit_ref(service_id='fake_id')
update_ref2 = unit.new_registered_limit_ref(default_limit='not_int') update_ref2 = unit.new_registered_limit_ref(default_limit='not_int')
update_ref3 = unit.new_registered_limit_ref(resource_name=123) update_ref3 = unit.new_registered_limit_ref(resource_name=123)
update_ref4 = unit.new_registered_limit_ref(region_id='fake_region') update_ref4 = unit.new_registered_limit_ref(region_id='fake_region')
update_ref5 = unit.new_registered_limit_ref(description=123) update_ref5 = unit.new_registered_limit_ref(description=123)
for input_limit in [update_ref1, update_ref2, update_ref3, for input_limit in [update_ref1, update_ref2, update_ref3,
update_ref4, update_ref5]: update_ref4, update_ref5]:
self.patch( self.patch(
'/registered_limits/%s' % reg_id, '/registered_limits/%s' % reg_id,
body={'registered_limit': input_limit}, body={'registered_limit': input_limit},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_update_registered_limit_with_referenced_limit(self): def test_update_registered_limit_with_referenced_limit(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
default_limit=10) default_limit=10)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref = unit.new_limit_ref(project_id=self.project_id, ref = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_ref = { update_ref = {
'service_id': self.service_id2, 'service_id': self.service_id2,
'region_id': self.region_id2, 'region_id': self.region_id2,
'resource_name': 'snapshot', 'resource_name': 'snapshot',
'default_limit': 5 'default_limit': 5
} }
self.patch( self.patch(
'/registered_limits/%s' % r.result['registered_limits'][0]['id'], '/registered_limits/%s' % r.result['registered_limits'][0]['id'],
body={'registered_limit': update_ref}, body={'registered_limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_list_registered_limit(self): def test_list_registered_limit(self):
r = self.get( r = self.get(
'/registered_limits', '/registered_limits',
expected_status=http_client.OK) expected_status=http.client.OK)
self.assertEqual([], r.result.get('registered_limits')) self.assertEqual([], r.result.get('registered_limits'))
ref1 = unit.new_registered_limit_ref(service_id=self.service_id, ref1 = unit.new_registered_limit_ref(service_id=self.service_id,
resource_name='test_resource', resource_name='test_resource',
region_id=self.region_id) region_id=self.region_id)
ref2 = unit.new_registered_limit_ref(service_id=self.service_id2, ref2 = unit.new_registered_limit_ref(service_id=self.service_id2,
resource_name='test_resource', resource_name='test_resource',
region_id=self.region_id2) region_id=self.region_id2)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref1, ref2]}, body={'registered_limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
id1 = r.result['registered_limits'][0]['id'] id1 = r.result['registered_limits'][0]['id']
r = self.get( r = self.get(
'/registered_limits', '/registered_limits',
expected_status=http_client.OK) expected_status=http.client.OK)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
self.assertEqual(len(registered_limits), 2) self.assertEqual(len(registered_limits), 2)
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'default_limit']: 'default_limit']:
if registered_limits[0]['id'] == id1: if registered_limits[0]['id'] == id1:
self.assertEqual(registered_limits[0][key], ref1[key]) self.assertEqual(registered_limits[0][key], ref1[key])
self.assertEqual(registered_limits[1][key], ref2[key]) self.assertEqual(registered_limits[1][key], ref2[key])
break break
self.assertEqual(registered_limits[1][key], ref1[key]) self.assertEqual(registered_limits[1][key], ref1[key])
self.assertEqual(registered_limits[0][key], ref2[key]) self.assertEqual(registered_limits[0][key], ref2[key])
r = self.get( r = self.get(
'/registered_limits?service_id=%s' % self.service_id, '/registered_limits?service_id=%s' % self.service_id,
expected_status=http_client.OK) expected_status=http.client.OK)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
self.assertEqual(len(registered_limits), 1) self.assertEqual(len(registered_limits), 1)
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'default_limit']: 'default_limit']:
self.assertEqual(registered_limits[0][key], ref1[key]) self.assertEqual(registered_limits[0][key], ref1[key])
r = self.get( r = self.get(
'/registered_limits?region_id=%s' % self.region_id2, '/registered_limits?region_id=%s' % self.region_id2,
expected_status=http_client.OK) expected_status=http.client.OK)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
self.assertEqual(len(registered_limits), 1) self.assertEqual(len(registered_limits), 1)
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'default_limit']: 'default_limit']:
self.assertEqual(registered_limits[0][key], ref2[key]) self.assertEqual(registered_limits[0][key], ref2[key])
r = self.get( r = self.get(
'/registered_limits?resource_name=test_resource', '/registered_limits?resource_name=test_resource',
expected_status=http_client.OK) expected_status=http.client.OK)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
self.assertEqual(len(registered_limits), 2) self.assertEqual(len(registered_limits), 2)
def test_show_registered_limit(self): def test_show_registered_limit(self):
ref1 = unit.new_registered_limit_ref(service_id=self.service_id, ref1 = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id) region_id=self.region_id)
ref2 = unit.new_registered_limit_ref(service_id=self.service_id2, ref2 = unit.new_registered_limit_ref(service_id=self.service_id2,
region_id=self.region_id2) region_id=self.region_id2)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref1, ref2]}, body={'registered_limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
id1 = r.result['registered_limits'][0]['id'] id1 = r.result['registered_limits'][0]['id']
self.get( self.get(
'/registered_limits/fake_id', '/registered_limits/fake_id',
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
r = self.get( r = self.get(
'/registered_limits/%s' % id1, '/registered_limits/%s' % id1,
expected_status=http_client.OK) expected_status=http.client.OK)
registered_limit = r.result['registered_limit'] registered_limit = r.result['registered_limit']
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'default_limit', 'description']: 'default_limit', 'description']:
self.assertEqual(registered_limit[key], ref1[key]) self.assertEqual(registered_limit[key], ref1[key])
def test_delete_registered_limit(self): def test_delete_registered_limit(self):
ref1 = unit.new_registered_limit_ref(service_id=self.service_id, ref1 = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id) region_id=self.region_id)
ref2 = unit.new_registered_limit_ref(service_id=self.service_id2, ref2 = unit.new_registered_limit_ref(service_id=self.service_id2,
region_id=self.region_id2) region_id=self.region_id2)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref1, ref2]}, body={'registered_limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
id1 = r.result['registered_limits'][0]['id'] id1 = r.result['registered_limits'][0]['id']
self.delete('/registered_limits/%s' % id1, self.delete('/registered_limits/%s' % id1,
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.NO_CONTENT) expected_status=http.client.NO_CONTENT)
self.delete('/registered_limits/fake_id', self.delete('/registered_limits/fake_id',
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
r = self.get( r = self.get(
'/registered_limits', '/registered_limits',
expected_status=http_client.OK) expected_status=http.client.OK)
registered_limits = r.result['registered_limits'] registered_limits = r.result['registered_limits']
self.assertEqual(len(registered_limits), 1) self.assertEqual(len(registered_limits), 1)
def test_delete_registered_limit_with_referenced_limit(self): def test_delete_registered_limit_with_referenced_limit(self):
ref = unit.new_registered_limit_ref(service_id=self.service_id, ref = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
default_limit=10) default_limit=10)
r = self.post( r = self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref]}, body={'registered_limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref = unit.new_limit_ref(project_id=self.project_id, ref = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
id = r.result['registered_limits'][0]['id'] id = r.result['registered_limits'][0]['id']
self.delete('/registered_limits/%s' % id, self.delete('/registered_limits/%s' % id,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
class LimitsTestCase(test_v3.RestfulTestCase): class LimitsTestCase(test_v3.RestfulTestCase):
"""Test limits CRUD.""" """Test limits CRUD."""
def setUp(self): def setUp(self):
super(LimitsTestCase, self).setUp() super(LimitsTestCase, self).setUp()
# FIXME(lbragstad): Remove all this duplicated logic once we get all # FIXME(lbragstad): Remove all this duplicated logic once we get all
# keystone tests using bootstrap consistently. This is something the # keystone tests using bootstrap consistently. This is something the
# bootstrap utility already does for us. # bootstrap utility already does for us.
reader_role = {'id': uuid.uuid4().hex, 'name': 'reader'} reader_role = {'id': uuid.uuid4().hex, 'name': 'reader'}
skipping to change at line 611 skipping to change at line 611
resource_name='volume') resource_name='volume')
ref2 = unit.new_registered_limit_ref(service_id=self.service_id2, ref2 = unit.new_registered_limit_ref(service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
ref3 = unit.new_registered_limit_ref(service_id=self.service_id, ref3 = unit.new_registered_limit_ref(service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='backup') resource_name='backup')
self.post( self.post(
'/registered_limits', '/registered_limits',
body={'registered_limits': [ref1, ref2, ref3]}, body={'registered_limits': [ref1, ref2, ref3]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
# Create more assignments, all are: # Create more assignments, all are:
# #
# self.user -- admin -- self.project # self.user -- admin -- self.project
# self.user -- non-admin -- self.project_2 # self.user -- non-admin -- self.project_2
# self.user -- admin -- self.domain # self.user -- admin -- self.domain
# self.user -- non-admin -- self.domain_2 # self.user -- non-admin -- self.domain_2
# self.user -- admin -- system # self.user -- admin -- system
self.project_2 = unit.new_project_ref(domain_id=self.domain_id) self.project_2 = unit.new_project_ref(domain_id=self.domain_id)
self.project_2_id = self.project_2['id'] self.project_2_id = self.project_2['id']
skipping to change at line 651 skipping to change at line 651
def test_create_project_limit(self): def test_create_project_limit(self):
ref = unit.new_limit_ref(project_id=self.project_id, ref = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
limits = r.result['limits'] limits = r.result['limits']
self.assertIsNotNone(limits[0]['id']) self.assertIsNotNone(limits[0]['id'])
self.assertIsNone(limits[0]['domain_id']) self.assertIsNone(limits[0]['domain_id'])
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'resource_limit', 'description', 'project_id']: 'resource_limit', 'description', 'project_id']:
self.assertEqual(limits[0][key], ref[key]) self.assertEqual(limits[0][key], ref[key])
def test_create_domain_limit(self): def test_create_domain_limit(self):
ref = unit.new_limit_ref(domain_id=self.domain_id, ref = unit.new_limit_ref(domain_id=self.domain_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
limits = r.result['limits'] limits = r.result['limits']
self.assertIsNotNone(limits[0]['id']) self.assertIsNotNone(limits[0]['id'])
self.assertIsNone(limits[0]['project_id']) self.assertIsNone(limits[0]['project_id'])
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'resource_limit', 'description', 'domain_id']: 'resource_limit', 'description', 'domain_id']:
self.assertEqual(limits[0][key], ref[key]) self.assertEqual(limits[0][key], ref[key])
def test_create_limit_without_region(self): def test_create_limit_without_region(self):
ref = unit.new_limit_ref(project_id=self.project_id, ref = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id2, service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
limits = r.result['limits'] limits = r.result['limits']
self.assertIsNotNone(limits[0]['id']) self.assertIsNotNone(limits[0]['id'])
self.assertIsNotNone(limits[0]['project_id']) self.assertIsNotNone(limits[0]['project_id'])
for key in ['service_id', 'resource_name', 'resource_limit']: for key in ['service_id', 'resource_name', 'resource_limit']:
self.assertEqual(limits[0][key], ref[key]) self.assertEqual(limits[0][key], ref[key])
self.assertIsNone(limits[0].get('region_id')) self.assertIsNone(limits[0].get('region_id'))
def test_create_limit_without_description(self): def test_create_limit_without_description(self):
ref = unit.new_limit_ref(project_id=self.project_id, ref = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
ref.pop('description') ref.pop('description')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
limits = r.result['limits'] limits = r.result['limits']
self.assertIsNotNone(limits[0]['id']) self.assertIsNotNone(limits[0]['id'])
self.assertIsNotNone(limits[0]['project_id']) self.assertIsNotNone(limits[0]['project_id'])
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'resource_limit']: 'resource_limit']:
self.assertEqual(limits[0][key], ref[key]) self.assertEqual(limits[0][key], ref[key])
self.assertIsNone(limits[0]['description']) self.assertIsNone(limits[0]['description'])
def test_create_limit_with_domain_as_project(self): def test_create_limit_with_domain_as_project(self):
skipping to change at line 738 skipping to change at line 738
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
ref2 = unit.new_limit_ref(project_id=self.project_id, ref2 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id2, service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref1, ref2]}, body={'limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
limits = r.result['limits'] limits = r.result['limits']
for key in ['service_id', 'resource_name', 'resource_limit']: for key in ['service_id', 'resource_name', 'resource_limit']:
self.assertEqual(limits[0][key], ref1[key]) self.assertEqual(limits[0][key], ref1[key])
self.assertEqual(limits[1][key], ref2[key]) self.assertEqual(limits[1][key], ref2[key])
self.assertEqual(limits[0]['region_id'], ref1['region_id']) self.assertEqual(limits[0]['region_id'], ref1['region_id'])
self.assertIsNone(limits[1].get('region_id')) self.assertIsNone(limits[1].get('region_id'))
def test_create_limit_return_count(self): def test_create_limit_return_count(self):
ref1 = unit.new_limit_ref(project_id=self.project_id, ref1 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref1]}, body={'limits': [ref1]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(1, len(limits)) self.assertEqual(1, len(limits))
ref2 = unit.new_limit_ref(project_id=self.project_id, ref2 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id2, service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
ref3 = unit.new_limit_ref(project_id=self.project_id, ref3 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='backup') resource_name='backup')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref2, ref3]}, body={'limits': [ref2, ref3]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(2, len(limits)) self.assertEqual(2, len(limits))
def test_create_limit_with_invalid_input(self): def test_create_limit_with_invalid_input(self):
ref1 = unit.new_limit_ref(project_id=self.project_id, ref1 = unit.new_limit_ref(project_id=self.project_id,
resource_limit='not_int') resource_limit='not_int')
ref2 = unit.new_limit_ref(project_id=self.project_id, ref2 = unit.new_limit_ref(project_id=self.project_id,
resource_name=123) resource_name=123)
ref3 = unit.new_limit_ref(project_id=self.project_id, ref3 = unit.new_limit_ref(project_id=self.project_id,
region_id='fake_region') region_id='fake_region')
for input_limit in [ref1, ref2, ref3]: for input_limit in [ref1, ref2, ref3]:
self.post( self.post(
'/limits', '/limits',
body={'limits': [input_limit]}, body={'limits': [input_limit]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_create_limit_duplicate(self): def test_create_limit_duplicate(self):
ref = unit.new_limit_ref(project_id=self.project_id, ref = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CONFLICT) expected_status=http.client.CONFLICT)
def test_create_limit_without_reference_registered_limit(self): def test_create_limit_without_reference_registered_limit(self):
ref = unit.new_limit_ref(project_id=self.project_id, ref = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id2, region_id=self.region_id2,
resource_name='volume') resource_name='volume')
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_update_limit(self): def test_update_limit(self):
ref = unit.new_limit_ref(project_id=self.project_id, ref = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=10) resource_limit=10)
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_ref = { update_ref = {
'resource_limit': 5, 'resource_limit': 5,
'description': 'test description' 'description': 'test description'
} }
r = self.patch( r = self.patch(
'/limits/%s' % r.result['limits'][0]['id'], '/limits/%s' % r.result['limits'][0]['id'],
body={'limit': update_ref}, body={'limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
new_limits = r.result['limit'] new_limits = r.result['limit']
self.assertEqual(new_limits['resource_limit'], 5) self.assertEqual(new_limits['resource_limit'], 5)
self.assertEqual(new_limits['description'], 'test description') self.assertEqual(new_limits['description'], 'test description')
def test_update_limit_not_found(self): def test_update_limit_not_found(self):
update_ref = { update_ref = {
'resource_limit': 5 'resource_limit': 5
} }
self.patch( self.patch(
'/limits/%s' % uuid.uuid4().hex, '/limits/%s' % uuid.uuid4().hex,
body={'limit': update_ref}, body={'limit': update_ref},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
def test_update_limit_with_invalid_input(self): def test_update_limit_with_invalid_input(self):
ref = unit.new_limit_ref(project_id=self.project_id, ref = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=10) resource_limit=10)
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
limit_id = r.result['limits'][0]['id'] limit_id = r.result['limits'][0]['id']
invalid_resource_limit_update = { invalid_resource_limit_update = {
'resource_limit': 'not_int' 'resource_limit': 'not_int'
} }
invalid_description_update = { invalid_description_update = {
'description': 123 'description': 123
} }
for input_limit in [invalid_resource_limit_update, for input_limit in [invalid_resource_limit_update,
invalid_description_update]: invalid_description_update]:
self.patch( self.patch(
'/limits/%s' % limit_id, '/limits/%s' % limit_id,
body={'limit': input_limit}, body={'limit': input_limit},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.BAD_REQUEST) expected_status=http.client.BAD_REQUEST)
def test_list_limit(self): def test_list_limit(self):
r = self.get( r = self.get(
'/limits', '/limits',
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
self.assertEqual([], r.result.get('limits')) self.assertEqual([], r.result.get('limits'))
ref1 = unit.new_limit_ref(project_id=self.project_id, ref1 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
ref2 = unit.new_limit_ref(project_id=self.project_id, ref2 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id2, service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref1, ref2]}, body={'limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
id1 = r.result['limits'][0]['id'] id1 = r.result['limits'][0]['id']
r = self.get( r = self.get(
'/limits', '/limits',
expected_status=http_client.OK) expected_status=http.client.OK)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(len(limits), 2) self.assertEqual(len(limits), 2)
if limits[0]['id'] == id1: if limits[0]['id'] == id1:
self.assertEqual(limits[0]['region_id'], ref1['region_id']) self.assertEqual(limits[0]['region_id'], ref1['region_id'])
self.assertIsNone(limits[1].get('region_id')) self.assertIsNone(limits[1].get('region_id'))
for key in ['service_id', 'resource_name', 'resource_limit']: for key in ['service_id', 'resource_name', 'resource_limit']:
self.assertEqual(limits[0][key], ref1[key]) self.assertEqual(limits[0][key], ref1[key])
self.assertEqual(limits[1][key], ref2[key]) self.assertEqual(limits[1][key], ref2[key])
else: else:
self.assertEqual(limits[1]['region_id'], ref1['region_id']) self.assertEqual(limits[1]['region_id'], ref1['region_id'])
self.assertIsNone(limits[0].get('region_id')) self.assertIsNone(limits[0].get('region_id'))
for key in ['service_id', 'resource_name', 'resource_limit']: for key in ['service_id', 'resource_name', 'resource_limit']:
self.assertEqual(limits[1][key], ref1[key]) self.assertEqual(limits[1][key], ref1[key])
self.assertEqual(limits[0][key], ref2[key]) self.assertEqual(limits[0][key], ref2[key])
r = self.get( r = self.get(
'/limits?service_id=%s' % self.service_id2, '/limits?service_id=%s' % self.service_id2,
expected_status=http_client.OK) expected_status=http.client.OK)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(len(limits), 1) self.assertEqual(len(limits), 1)
for key in ['service_id', 'resource_name', 'resource_limit']: for key in ['service_id', 'resource_name', 'resource_limit']:
self.assertEqual(limits[0][key], ref2[key]) self.assertEqual(limits[0][key], ref2[key])
r = self.get( r = self.get(
'/limits?region_id=%s' % self.region_id, '/limits?region_id=%s' % self.region_id,
expected_status=http_client.OK) expected_status=http.client.OK)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(len(limits), 1) self.assertEqual(len(limits), 1)
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'resource_limit']: 'resource_limit']:
self.assertEqual(limits[0][key], ref1[key]) self.assertEqual(limits[0][key], ref1[key])
r = self.get( r = self.get(
'/limits?resource_name=volume', '/limits?resource_name=volume',
expected_status=http_client.OK) expected_status=http.client.OK)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(len(limits), 1) self.assertEqual(len(limits), 1)
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'resource_limit']: 'resource_limit']:
self.assertEqual(limits[0][key], ref1[key]) self.assertEqual(limits[0][key], ref1[key])
def test_list_limit_with_project_id_filter(self): def test_list_limit_with_project_id_filter(self):
# create two limit in different projects for test. # create two limit in different projects for test.
self.config_fixture.config(group='oslo_policy', self.config_fixture.config(group='oslo_policy',
enforce_scope=True) enforce_scope=True)
skipping to change at line 956 skipping to change at line 956
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
ref2 = unit.new_limit_ref(project_id=self.project_2_id, ref2 = unit.new_limit_ref(project_id=self.project_2_id,
service_id=self.service_id2, service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref1, ref2]}, body={'limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
# non system scoped request will get the limits in its project. # non system scoped request will get the limits in its project.
r = self.get('/limits', expected_status=http_client.OK) r = self.get('/limits', expected_status=http.client.OK)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(1, len(limits)) self.assertEqual(1, len(limits))
self.assertEqual(self.project_id, limits[0]['project_id']) self.assertEqual(self.project_id, limits[0]['project_id'])
r = self.get( r = self.get(
'/limits', expected_status=http_client.OK, '/limits', expected_status=http.client.OK,
auth=self.build_authentication_request( auth=self.build_authentication_request(
user_id=self.user['id'], password=self.user['password'], user_id=self.user['id'], password=self.user['password'],
project_id=self.project_2_id)) project_id=self.project_2_id))
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(1, len(limits)) self.assertEqual(1, len(limits))
self.assertEqual(self.project_2_id, limits[0]['project_id']) self.assertEqual(self.project_2_id, limits[0]['project_id'])
# any project user can filter by their own project # any project user can filter by their own project
r = self.get( r = self.get(
'/limits?project_id=%s' % self.project_id, '/limits?project_id=%s' % self.project_id,
expected_status=http_client.OK) expected_status=http.client.OK)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(1, len(limits)) self.assertEqual(1, len(limits))
self.assertEqual(self.project_id, limits[0]['project_id']) self.assertEqual(self.project_id, limits[0]['project_id'])
# a system scoped request can specify the project_id filter # a system scoped request can specify the project_id filter
r = self.get( r = self.get(
'/limits?project_id=%s' % self.project_id, '/limits?project_id=%s' % self.project_id,
expected_status=http_client.OK, expected_status=http.client.OK,
token=self.system_admin_token token=self.system_admin_token
) )
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(1, len(limits)) self.assertEqual(1, len(limits))
self.assertEqual(self.project_id, limits[0]['project_id']) self.assertEqual(self.project_id, limits[0]['project_id'])
def test_list_limit_with_domain_id_filter(self): def test_list_limit_with_domain_id_filter(self):
# create two limit in different domains for test. # create two limit in different domains for test.
ref1 = unit.new_limit_ref(domain_id=self.domain_id, ref1 = unit.new_limit_ref(domain_id=self.domain_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
ref2 = unit.new_limit_ref(domain_id=self.domain_2_id, ref2 = unit.new_limit_ref(domain_id=self.domain_2_id,
service_id=self.service_id2, service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref1, ref2]}, body={'limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
# non system scoped request will get the limits in its domain. # non system scoped request will get the limits in its domain.
r = self.get( r = self.get(
'/limits', expected_status=http_client.OK, '/limits', expected_status=http.client.OK,
auth=self.build_authentication_request( auth=self.build_authentication_request(
user_id=self.user['id'], password=self.user['password'], user_id=self.user['id'], password=self.user['password'],
domain_id=self.domain_id)) domain_id=self.domain_id))
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(1, len(limits)) self.assertEqual(1, len(limits))
self.assertEqual(self.domain_id, limits[0]['domain_id']) self.assertEqual(self.domain_id, limits[0]['domain_id'])
r = self.get( r = self.get(
'/limits', expected_status=http_client.OK, '/limits', expected_status=http.client.OK,
auth=self.build_authentication_request( auth=self.build_authentication_request(
user_id=self.user['id'], password=self.user['password'], user_id=self.user['id'], password=self.user['password'],
domain_id=self.domain_2_id)) domain_id=self.domain_2_id))
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(1, len(limits)) self.assertEqual(1, len(limits))
self.assertEqual(self.domain_2_id, limits[0]['domain_id']) self.assertEqual(self.domain_2_id, limits[0]['domain_id'])
# if non system scoped request contain domain_id filter, keystone # if non system scoped request contain domain_id filter, keystone
# will return an empty list. # will return an empty list.
r = self.get( r = self.get(
'/limits?domain_id=%s' % self.domain_id, '/limits?domain_id=%s' % self.domain_id,
expected_status=http_client.OK) expected_status=http.client.OK)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(0, len(limits)) self.assertEqual(0, len(limits))
# a system scoped request can specify the domain_id filter # a system scoped request can specify the domain_id filter
r = self.get( r = self.get(
'/limits?domain_id=%s' % self.domain_id, '/limits?domain_id=%s' % self.domain_id,
expected_status=http_client.OK, expected_status=http.client.OK,
auth=self.build_authentication_request( auth=self.build_authentication_request(
user_id=self.user['id'], password=self.user['password'], user_id=self.user['id'], password=self.user['password'],
system=True)) system=True))
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(1, len(limits)) self.assertEqual(1, len(limits))
self.assertEqual(self.domain_id, limits[0]['domain_id']) self.assertEqual(self.domain_id, limits[0]['domain_id'])
def test_show_project_limit(self): def test_show_project_limit(self):
ref1 = unit.new_limit_ref(project_id=self.project_id, ref1 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
ref2 = unit.new_limit_ref(project_id=self.project_id, ref2 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id2, service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref1, ref2]}, body={'limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
if r.result['limits'][0]['resource_name'] == 'volume': if r.result['limits'][0]['resource_name'] == 'volume':
id1 = r.result['limits'][0]['id'] id1 = r.result['limits'][0]['id']
else: else:
id1 = r.result['limits'][1]['id'] id1 = r.result['limits'][1]['id']
self.get('/limits/fake_id', self.get('/limits/fake_id',
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
r = self.get('/limits/%s' % id1, r = self.get('/limits/%s' % id1,
expected_status=http_client.OK) expected_status=http.client.OK)
limit = r.result['limit'] limit = r.result['limit']
self.assertIsNone(limit['domain_id']) self.assertIsNone(limit['domain_id'])
for key in ['service_id', 'region_id', 'resource_name', for key in ['service_id', 'region_id', 'resource_name',
'resource_limit', 'description', 'project_id']: 'resource_limit', 'description', 'project_id']:
self.assertEqual(limit[key], ref1[key]) self.assertEqual(limit[key], ref1[key])
def test_show_domain_limit(self): def test_show_domain_limit(self):
ref1 = unit.new_limit_ref(domain_id=self.domain_id, ref1 = unit.new_limit_ref(domain_id=self.domain_id,
service_id=self.service_id2, service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref1]}, body={'limits': [ref1]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
id1 = r.result['limits'][0]['id'] id1 = r.result['limits'][0]['id']
r = self.get('/limits/%s' % id1, r = self.get('/limits/%s' % id1,
expected_status=http_client.OK, expected_status=http.client.OK,
auth=self.build_authentication_request( auth=self.build_authentication_request(
user_id=self.user['id'], user_id=self.user['id'],
password=self.user['password'], password=self.user['password'],
domain_id=self.domain_id)) domain_id=self.domain_id))
limit = r.result['limit'] limit = r.result['limit']
self.assertIsNone(limit['project_id']) self.assertIsNone(limit['project_id'])
self.assertIsNone(limit['region_id']) self.assertIsNone(limit['region_id'])
for key in ['service_id', 'resource_name', 'resource_limit', for key in ['service_id', 'resource_name', 'resource_limit',
'description', 'domain_id']: 'description', 'domain_id']:
self.assertEqual(limit[key], ref1[key]) self.assertEqual(limit[key], ref1[key])
skipping to change at line 1108 skipping to change at line 1108
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume') resource_name='volume')
ref2 = unit.new_limit_ref(project_id=self.project_id, ref2 = unit.new_limit_ref(project_id=self.project_id,
service_id=self.service_id2, service_id=self.service_id2,
resource_name='snapshot') resource_name='snapshot')
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref1, ref2]}, body={'limits': [ref1, ref2]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
id1 = r.result['limits'][0]['id'] id1 = r.result['limits'][0]['id']
self.delete('/limits/%s' % id1, self.delete('/limits/%s' % id1,
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.NO_CONTENT) expected_status=http.client.NO_CONTENT)
self.delete('/limits/fake_id', self.delete('/limits/fake_id',
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.NOT_FOUND) expected_status=http.client.NOT_FOUND)
r = self.get( r = self.get(
'/limits', '/limits',
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
limits = r.result['limits'] limits = r.result['limits']
self.assertEqual(len(limits), 1) self.assertEqual(len(limits), 1)
class StrictTwoLevelLimitsTestCase(LimitsTestCase): class StrictTwoLevelLimitsTestCase(LimitsTestCase):
def setUp(self): def setUp(self):
super(StrictTwoLevelLimitsTestCase, self).setUp() super(StrictTwoLevelLimitsTestCase, self).setUp()
# Most of these tests require system-scoped tokens. Let's have one on # Most of these tests require system-scoped tokens. Let's have one on
# hand so that we can use it in tests when we need it. # hand so that we can use it in tests when we need it.
PROVIDERS.assignment_api.create_system_grant_for_user( PROVIDERS.assignment_api.create_system_grant_for_user(
skipping to change at line 1182 skipping to change at line 1182
# B C B,15 C,18 # B C B,15 C,18
ref = unit.new_limit_ref(domain_id=self.domain_A['id'], ref = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=20) resource_limit=20)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref = unit.new_limit_ref(project_id=self.project_B['id'], ref = unit.new_limit_ref(project_id=self.project_B['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=15) resource_limit=15)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref = unit.new_limit_ref(project_id=self.project_C['id'], ref = unit.new_limit_ref(project_id=self.project_C['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=18) resource_limit=18)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
def test_create_child_limit_break_hierarchical_tree(self): def test_create_child_limit_break_hierarchical_tree(self):
# when A is 20, success to create B to 15, but fail to create C to 21. # when A is 20, success to create B to 15, but fail to create C to 21.
# A,20 A,20 # A,20 A,20
# / \ --> / \ # / \ --> / \
# B C B,15 C # B C B,15 C
# #
# A,20 A,20 # A,20 A,20
# / \ -/-> / \ # / \ -/-> / \
# B,15 C B,15 C,21 # B,15 C B,15 C,21
ref = unit.new_limit_ref(domain_id=self.domain_A['id'], ref = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=20) resource_limit=20)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref = unit.new_limit_ref(project_id=self.project_B['id'], ref = unit.new_limit_ref(project_id=self.project_B['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=15) resource_limit=15)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref = unit.new_limit_ref(project_id=self.project_C['id'], ref = unit.new_limit_ref(project_id=self.project_C['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=21) resource_limit=21)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_create_child_with_default_parent(self): def test_create_child_with_default_parent(self):
# If A is not set, the default value is 10 (from registered limit). # If A is not set, the default value is 10 (from registered limit).
# success to create B to 5, but fail to create C to 11. # success to create B to 5, but fail to create C to 11.
# A(10) A(10) # A(10) A(10)
# / \ --> / \ # / \ --> / \
# B C B,5 C # B C B,5 C
# #
# A(10) A(10) # A(10) A(10)
# / \ -/-> / \ # / \ -/-> / \
# B,5 C B,5 C,11 # B,5 C B,5 C,11
ref = unit.new_limit_ref(project_id=self.project_B['id'], ref = unit.new_limit_ref(project_id=self.project_B['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=5) resource_limit=5)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref = unit.new_limit_ref(project_id=self.project_C['id'], ref = unit.new_limit_ref(project_id=self.project_C['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=11) resource_limit=11)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_create_parent_limit(self): def test_create_parent_limit(self):
# When B is 9 , success to set A to 12 # When B is 9 , success to set A to 12
# A A,12 # A A,12
# / \ --> / \ # / \ --> / \
# B,9 C B,9 C # B,9 C B,9 C
ref = unit.new_limit_ref(project_id=self.project_B['id'], ref = unit.new_limit_ref(project_id=self.project_B['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=9) resource_limit=9)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref = unit.new_limit_ref(domain_id=self.domain_A['id'], ref = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=12) resource_limit=12)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
def test_create_parent_limit_break_hierarchical_tree(self): def test_create_parent_limit_break_hierarchical_tree(self):
# When B is 9 , fail to set A to 8 # When B is 9 , fail to set A to 8
# A A,8 # A A,8
# / \ -/-> / \ # / \ -/-> / \
# B,9 C B,9 C # B,9 C B,9 C
ref = unit.new_limit_ref(project_id=self.project_B['id'], ref = unit.new_limit_ref(project_id=self.project_B['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=9) resource_limit=9)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref = unit.new_limit_ref(domain_id=self.domain_A['id'], ref = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=8) resource_limit=8)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref]}, body={'limits': [ref]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_create_multi_limits(self): def test_create_multi_limits(self):
# success to create a tree in one request like: # success to create a tree in one request like:
# A,12 D,9 # A,12 D,9
# / \ / \ # / \ / \
# B,9 C,5 E,5 F,4 # B,9 C,5 E,5 F,4
ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'], ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
skipping to change at line 1373 skipping to change at line 1373
resource_limit=5) resource_limit=5)
ref_F = unit.new_limit_ref(project_id=self.project_F['id'], ref_F = unit.new_limit_ref(project_id=self.project_F['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=4) resource_limit=4)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref_A, ref_B, ref_C, ref_D, ref_E, ref_F]}, body={'limits': [ref_A, ref_B, ref_C, ref_D, ref_E, ref_F]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
def test_create_multi_limits_invalid_input(self): def test_create_multi_limits_invalid_input(self):
# fail to create a tree in one request like: # fail to create a tree in one request like:
# A,12 D,9 # A,12 D,9
# / \ / \ # / \ / \
# B,9 C,5 E,5 F,10 # B,9 C,5 E,5 F,10
# because F will break the second limit tree. # because F will break the second limit tree.
ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'], ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
skipping to change at line 1415 skipping to change at line 1415
resource_limit=5) resource_limit=5)
ref_F = unit.new_limit_ref(project_id=self.project_F['id'], ref_F = unit.new_limit_ref(project_id=self.project_F['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=10) resource_limit=10)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref_A, ref_B, ref_C, ref_D, ref_E, ref_F]}, body={'limits': [ref_A, ref_B, ref_C, ref_D, ref_E, ref_F]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_create_multi_limits_break_hierarchical_tree(self): def test_create_multi_limits_break_hierarchical_tree(self):
# when there is some hierarchical_trees already like: # when there is some hierarchical_trees already like:
# A,12 D # A,12 D
# / \ / \ # / \ / \
# B,9 C E,5 F # B,9 C E,5 F
# fail to set C to 5 and D to 4 in one request like: # fail to set C to 5 and D to 4 in one request like:
# A,12 D,4 # A,12 D,4
# / \ / \ # / \ / \
# B,9 C,5 E,5 F # B,9 C,5 E,5 F
skipping to change at line 1446 skipping to change at line 1446
resource_limit=9) resource_limit=9)
ref_E = unit.new_limit_ref(project_id=self.project_E['id'], ref_E = unit.new_limit_ref(project_id=self.project_E['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=5) resource_limit=5)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref_A, ref_B, ref_E]}, body={'limits': [ref_A, ref_B, ref_E]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
ref_C = unit.new_limit_ref(project_id=self.project_C['id'], ref_C = unit.new_limit_ref(project_id=self.project_C['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=5) resource_limit=5)
ref_D = unit.new_limit_ref(domain_id=self.domain_D['id'], ref_D = unit.new_limit_ref(domain_id=self.domain_D['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=4) resource_limit=4)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref_C, ref_D]}, body={'limits': [ref_C, ref_D]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_update_child_limit(self): def test_update_child_limit(self):
# Success to update C to 9 # Success to update C to 9
# A,10 A,10 # A,10 A,10
# / \ --> / \ # / \ --> / \
# B,6 C,7 B,6 C,9 # B,6 C,7 B,6 C,9
ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'], ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
skipping to change at line 1488 skipping to change at line 1488
resource_limit=6) resource_limit=6)
ref_C = unit.new_limit_ref(project_id=self.project_C['id'], ref_C = unit.new_limit_ref(project_id=self.project_C['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=7) resource_limit=7)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref_A, ref_B]}, body={'limits': [ref_A, ref_B]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref_C]}, body={'limits': [ref_C]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_dict = {'resource_limit': 9} update_dict = {'resource_limit': 9}
self.patch( self.patch(
'/limits/%s' % r.result['limits'][0]['id'], '/limits/%s' % r.result['limits'][0]['id'],
body={'limit': update_dict}, body={'limit': update_dict},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
def test_update_child_limit_break_hierarchical_tree(self): def test_update_child_limit_break_hierarchical_tree(self):
# Fail to update C to 11 # Fail to update C to 11
# A,10 A,10 # A,10 A,10
# / \ -/-> / \ # / \ -/-> / \
# B,6 C,7 B,6 C,11 # B,6 C,7 B,6 C,11
ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'], ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
skipping to change at line 1526 skipping to change at line 1526
resource_limit=6) resource_limit=6)
ref_C = unit.new_limit_ref(project_id=self.project_C['id'], ref_C = unit.new_limit_ref(project_id=self.project_C['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=7) resource_limit=7)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref_A, ref_B]}, body={'limits': [ref_A, ref_B]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref_C]}, body={'limits': [ref_C]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_dict = {'resource_limit': 11} update_dict = {'resource_limit': 11}
self.patch( self.patch(
'/limits/%s' % r.result['limits'][0]['id'], '/limits/%s' % r.result['limits'][0]['id'],
body={'limit': update_dict}, body={'limit': update_dict},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_update_child_limit_with_default_parent(self): def test_update_child_limit_with_default_parent(self):
# If A is not set, the default value is 10 (from registered limit). # If A is not set, the default value is 10 (from registered limit).
# Success to update C to 9 but fail to update C to 11 # Success to update C to 9 but fail to update C to 11
# A,(10) A,(10) # A,(10) A,(10)
# / \ --> / \ # / \ --> / \
# B, C,7 B C,9 # B, C,7 B C,9
# #
# A,(10) A,(10) # A,(10) A,(10)
# / \ -/-> / \ # / \ -/-> / \
# B, C,7 B C,11 # B, C,7 B C,11
ref_C = unit.new_limit_ref(project_id=self.project_C['id'], ref_C = unit.new_limit_ref(project_id=self.project_C['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=7) resource_limit=7)
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref_C]}, body={'limits': [ref_C]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_dict = {'resource_limit': 9} update_dict = {'resource_limit': 9}
self.patch( self.patch(
'/limits/%s' % r.result['limits'][0]['id'], '/limits/%s' % r.result['limits'][0]['id'],
body={'limit': update_dict}, body={'limit': update_dict},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
update_dict = {'resource_limit': 11} update_dict = {'resource_limit': 11}
self.patch( self.patch(
'/limits/%s' % r.result['limits'][0]['id'], '/limits/%s' % r.result['limits'][0]['id'],
body={'limit': update_dict}, body={'limit': update_dict},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
def test_update_parent_limit(self): def test_update_parent_limit(self):
# Success to update A to 8 # Success to update A to 8
# A,10 A,8 # A,10 A,8
# / \ --> / \ # / \ --> / \
# B,6 C,7 B,6 C,7 # B,6 C,7 B,6 C,7
ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'], ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
skipping to change at line 1599 skipping to change at line 1599
resource_limit=6) resource_limit=6)
ref_C = unit.new_limit_ref(project_id=self.project_C['id'], ref_C = unit.new_limit_ref(project_id=self.project_C['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=7) resource_limit=7)
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref_A]}, body={'limits': [ref_A]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref_B, ref_C]}, body={'limits': [ref_B, ref_C]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_dict = {'resource_limit': 8} update_dict = {'resource_limit': 8}
self.patch( self.patch(
'/limits/%s' % r.result['limits'][0]['id'], '/limits/%s' % r.result['limits'][0]['id'],
body={'limit': update_dict}, body={'limit': update_dict},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.OK) expected_status=http.client.OK)
def test_update_parent_limit_break_hierarchical_tree(self): def test_update_parent_limit_break_hierarchical_tree(self):
# Fail to update A to 6 # Fail to update A to 6
# A,10 A,6 # A,10 A,6
# / \ -/-> / \ # / \ -/-> / \
# B,6 C,7 B,6 C,7 # B,6 C,7 B,6 C,7
ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'], ref_A = unit.new_limit_ref(domain_id=self.domain_A['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
skipping to change at line 1637 skipping to change at line 1637
resource_limit=6) resource_limit=6)
ref_C = unit.new_limit_ref(project_id=self.project_C['id'], ref_C = unit.new_limit_ref(project_id=self.project_C['id'],
service_id=self.service_id, service_id=self.service_id,
region_id=self.region_id, region_id=self.region_id,
resource_name='volume', resource_name='volume',
resource_limit=7) resource_limit=7)
r = self.post( r = self.post(
'/limits', '/limits',
body={'limits': [ref_A]}, body={'limits': [ref_A]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
self.post( self.post(
'/limits', '/limits',
body={'limits': [ref_B, ref_C]}, body={'limits': [ref_B, ref_C]},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.CREATED) expected_status=http.client.CREATED)
update_dict = {'resource_limit': 6} update_dict = {'resource_limit': 6}
self.patch( self.patch(
'/limits/%s' % r.result['limits'][0]['id'], '/limits/%s' % r.result['limits'][0]['id'],
body={'limit': update_dict}, body={'limit': update_dict},
token=self.system_admin_token, token=self.system_admin_token,
expected_status=http_client.FORBIDDEN) expected_status=http.client.FORBIDDEN)
 End of changes. 121 change blocks. 
120 lines changed or deleted 120 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)