"Fossies" - the Fresh Open Source Software Archive

Member "ec2-api-12.0.0/ec2api/tests/unit/test_ec2_validate.py" (14 Apr 2021, 11227 Bytes) of package /linux/misc/openstack/ec2-api-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 # Copyright 2012 Cloudscaling, Inc.
    2 # All Rights Reserved.
    3 # Copyright 2013 Red Hat, Inc.
    4 #
    5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    6 #    not use this file except in compliance with the License. You may obtain
    7 #    a copy of the License at
    8 #
    9 #         http://www.apache.org/licenses/LICENSE-2.0
   10 #
   11 #    Unless required by applicable law or agreed to in writing, software
   12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   14 #    License for the specific language governing permissions and limitations
   15 #    under the License.
   16 
   17 import datetime
   18 
   19 from oslo_utils import timeutils
   20 import testtools
   21 
   22 from ec2api.api import common
   23 from ec2api.api import ec2utils
   24 from ec2api import exception
   25 from ec2api.tests.unit import tools
   26 
   27 
   28 class EC2ValidationTestCase(testtools.TestCase):
   29     """Test case for various validations."""
   30 
   31     def test_validate_net(self):
   32         validator = common.Validator()
   33         validator.ip('10.10.0.0')
   34         validator.cidr('10.10.0.0/24')
   35         validator.subnet_cidr('10.10.0.0/24')
   36         validator.vpc_cidr('10.10.0.0/24')
   37 
   38         def check_raise_invalid_parameter(cidr):
   39             self.assertRaises(exception.InvalidParameterValue,
   40                               validator.cidr, cidr)
   41 
   42         check_raise_invalid_parameter('fake')
   43         check_raise_invalid_parameter('10.10/24')
   44         check_raise_invalid_parameter('10.10.0.0.0/24')
   45         check_raise_invalid_parameter('10.10.0.0')
   46         check_raise_invalid_parameter(' 10.10.0.0/24')
   47         check_raise_invalid_parameter('10.10.0.0/24 ')
   48         check_raise_invalid_parameter('.10.10.0.0/24 ')
   49         check_raise_invalid_parameter('-1.10.0.0/24')
   50         check_raise_invalid_parameter('10.256.0.0/24')
   51         check_raise_invalid_parameter('10.10.0.0/33')
   52         check_raise_invalid_parameter('10.10.0.0/-1')
   53 
   54         self.assertRaises(exception.InvalidParameterValue,
   55                           validator.ip, '10.256.0.0')
   56         self.assertRaises(exception.InvalidSubnetRange,
   57                           validator.subnet_cidr, '10.10.0.0/15')
   58         self.assertRaises(exception.InvalidVpcRange,
   59                           validator.vpc_cidr, '10.10.0.0/29')
   60 
   61     def test_validate_id(self):
   62         validator = common.Validator()
   63         validator.ec2_id('i-00000001')
   64         validator.i_id('i-00000001')
   65         validator.ami_id('ami-00000001')
   66         validator.eni_id('eni-00000001')
   67         validator.sg_id('sg-00000001')
   68         validator.subnet_id('subnet-00000001')
   69         validator.igw_id('igw-00000001')
   70         validator.rtb_id('rtb-00000001')
   71         validator.vpc_id('vpc-00000001')
   72         validator.vol_id('vol-00000001')
   73         validator.snap_id('snap-00000001')
   74         validator.dopt_id('dopt-00000001')
   75         validator.eni_attach_id('eni-attach-00000001')
   76         validator.eipalloc_id('eipalloc-00000001')
   77         validator.eipassoc_id('eipassoc-00000001')
   78         validator.rtbassoc_id('rtbassoc-00000001')
   79         validator.vgw_id('vgw-00000001')
   80         validator.cgw_id('cgw-00000001')
   81 
   82         invalid_ids = ['1234', 'a-1111', '', 'i-1111', 'i-rrr', 'foobar']
   83 
   84         def check_raise_invalid_parameters(func):
   85             for id in invalid_ids:
   86                 self.assertRaises(exception.InvalidParameterValue, func, id)
   87 
   88         check_raise_invalid_parameters(validator.ami_id)
   89         check_raise_invalid_parameters(validator.eni_id)
   90         check_raise_invalid_parameters(validator.sg_id)
   91         check_raise_invalid_parameters(validator.subnet_id)
   92         check_raise_invalid_parameters(validator.igw_id)
   93         check_raise_invalid_parameters(validator.rtb_id)
   94         check_raise_invalid_parameters(validator.vpc_id)
   95         check_raise_invalid_parameters(validator.vol_id)
   96         check_raise_invalid_parameters(validator.snap_id)
   97         check_raise_invalid_parameters(validator.dopt_id)
   98         check_raise_invalid_parameters(validator.eni_attach_id)
   99         check_raise_invalid_parameters(validator.eipalloc_id)
  100         check_raise_invalid_parameters(validator.eipassoc_id)
  101         check_raise_invalid_parameters(validator.rtbassoc_id)
  102         check_raise_invalid_parameters(validator.vgw_id)
  103         check_raise_invalid_parameters(validator.cgw_id)
  104 
  105         invalid_ids = ['1234', 'a-1111', '', 'vpc-1111', 'vpc-rrr', 'foobar']
  106 
  107         check_raise_invalid_parameters(validator.i_id)
  108 
  109         invalid_ids = ['1234', '', 'foobar']
  110 
  111         check_raise_invalid_parameters(validator.ec2_id)
  112 
  113     def test_validate_multi(self):
  114         validator = common.Validator()
  115         result_sum = {'value': 0}
  116         list_to_sum = [1, 2, 3, 4]
  117 
  118         def sum(value):
  119             # NOTE(Alex) Because nonlocal is only in python 3.0
  120             result_sum['value'] += value
  121 
  122         validator.multi(list_to_sum, sum)
  123         self.assertEqual(result_sum['value'], 10)
  124 
  125         self.assertRaises(exception.InvalidParameterValue,
  126                           validator.multi, 'not a list', sum)
  127 
  128     def test_validate_primitive(self):
  129         validator = common.Validator()
  130         validator.int(5)
  131         validator.bool(True)
  132         validator.str('str')
  133         validator.str64('str')
  134         validator.str255('str')
  135 
  136         def check_raise_validation_error(value, func):
  137             self.assertRaises(exception.ValidationError,
  138                               func, value)
  139 
  140         check_raise_validation_error('str', validator.int)
  141         check_raise_validation_error('str', validator.bool)
  142         check_raise_validation_error(5, validator.str)
  143         check_raise_validation_error('x' * 65, validator.str64)
  144         check_raise_validation_error('x' * 256, validator.str255)
  145 
  146     def test_validate_security_group(self):
  147         validator = common.Validator(params={})
  148         validator.security_group_str('name')
  149         validator.security_group_str('aa #^% -=99')
  150         validator = common.Validator(params={'vpc_id': 'vpc_id'})
  151         validator.security_group_str('name')
  152 
  153         def check_raise_validation_error(value):
  154             self.assertRaises(exception.ValidationError,
  155                               validator.security_group_str, value)
  156 
  157         validator = common.Validator(params={})
  158         check_raise_validation_error('aa \t\x01\x02\x7f')
  159         check_raise_validation_error('x' * 256)
  160 
  161         validator = common.Validator(params={'vpc_id': 'vpc_id'})
  162         check_raise_validation_error('aa #^% -=99')
  163         check_raise_validation_error('x' * 256)
  164 
  165     def test_validate_vpn_connection_type(self):
  166         validator = common.Validator()
  167         validator.vpn_connection_type('ipsec.1')
  168 
  169         invalid_ids = ['1234', 'a-1111', '', 'vpc-1111', 'vpc-rrr', 'foobar',
  170                        'ipsec1', 'openvpn', 'pptp', 'l2tp', 'freelan']
  171         for id in invalid_ids:
  172             self.assertRaises(exception.InvalidParameterValue,
  173                               validator.vpn_connection_type, id)
  174 
  175 
  176 class EC2TimestampValidationTestCase(testtools.TestCase):
  177     """Test case for EC2 request timestamp validation."""
  178 
  179     def test_validate_ec2_timestamp_valid(self):
  180         params = {'Timestamp': '2011-04-22T11:29:49Z'}
  181         expired = ec2utils.is_ec2_timestamp_expired(params)
  182         self.assertFalse(expired)
  183 
  184     @tools.screen_all_logs
  185     def test_validate_ec2_timestamp_old_format(self):
  186         params = {'Timestamp': '2011-04-22T11:29:49'}
  187         expired = ec2utils.is_ec2_timestamp_expired(params)
  188         self.assertTrue(expired)
  189 
  190     def test_validate_ec2_timestamp_not_set(self):
  191         params = {}
  192         expired = ec2utils.is_ec2_timestamp_expired(params)
  193         self.assertFalse(expired)
  194 
  195     def test_validate_ec2_timestamp_ms_time_regex(self):
  196         result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49.123Z')
  197         self.assertIsNotNone(result)
  198         result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49.123456Z')
  199         self.assertIsNotNone(result)
  200         result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49.1234567Z')
  201         self.assertIsNone(result)
  202         result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49.123')
  203         self.assertIsNone(result)
  204         result = ec2utils._ms_time_regex.match('2011-04-22T11:29:49Z')
  205         self.assertIsNone(result)
  206 
  207     @tools.screen_all_logs
  208     def test_validate_ec2_timestamp_aws_sdk_format(self):
  209         params = {'Timestamp': '2011-04-22T11:29:49.123Z'}
  210         expired = ec2utils.is_ec2_timestamp_expired(params)
  211         self.assertFalse(expired)
  212         expired = ec2utils.is_ec2_timestamp_expired(params, expires=300)
  213         self.assertTrue(expired)
  214 
  215     @tools.screen_all_logs
  216     def test_validate_ec2_timestamp_invalid_format(self):
  217         params = {'Timestamp': '2011-04-22T11:29:49.000P'}
  218         expired = ec2utils.is_ec2_timestamp_expired(params)
  219         self.assertTrue(expired)
  220 
  221     def test_validate_ec2_timestamp_advanced_time(self):
  222 
  223         # EC2 request with Timestamp in advanced time
  224         timestamp = timeutils.utcnow() + datetime.timedelta(seconds=250)
  225         params = {'Timestamp': timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")}
  226         expired = ec2utils.is_ec2_timestamp_expired(params, expires=300)
  227         self.assertFalse(expired)
  228 
  229     @tools.screen_all_logs
  230     def test_validate_ec2_timestamp_advanced_time_expired(self):
  231         timestamp = timeutils.utcnow() + datetime.timedelta(seconds=350)
  232         params = {'Timestamp': timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")}
  233         expired = ec2utils.is_ec2_timestamp_expired(params, expires=300)
  234         self.assertTrue(expired)
  235 
  236     def test_validate_ec2_req_timestamp_not_expired(self):
  237         params = {'Timestamp': ec2utils.isotime()}
  238         expired = ec2utils.is_ec2_timestamp_expired(params, expires=15)
  239         self.assertFalse(expired)
  240 
  241     @tools.screen_all_logs
  242     def test_validate_ec2_req_timestamp_expired(self):
  243         params = {'Timestamp': '2011-04-22T12:00:00Z'}
  244         compare = ec2utils.is_ec2_timestamp_expired(params, expires=300)
  245         self.assertTrue(compare)
  246 
  247     @tools.screen_all_logs
  248     def test_validate_ec2_req_expired(self):
  249         params = {'Expires': ec2utils.isotime()}
  250         expired = ec2utils.is_ec2_timestamp_expired(params)
  251         self.assertTrue(expired)
  252 
  253     def test_validate_ec2_req_not_expired(self):
  254         expire = timeutils.utcnow() + datetime.timedelta(seconds=350)
  255         params = {'Expires': expire.strftime("%Y-%m-%dT%H:%M:%SZ")}
  256         expired = ec2utils.is_ec2_timestamp_expired(params)
  257         self.assertFalse(expired)
  258 
  259     @tools.screen_all_logs
  260     def test_validate_Expires_timestamp_invalid_format(self):
  261 
  262         # EC2 request with invalid Expires
  263         params = {'Expires': '2011-04-22T11:29:49'}
  264         expired = ec2utils.is_ec2_timestamp_expired(params)
  265         self.assertTrue(expired)
  266 
  267     @tools.screen_all_logs
  268     def test_validate_ec2_req_timestamp_Expires(self):
  269 
  270         # EC2 request with both Timestamp and Expires
  271         params = {'Timestamp': '2011-04-22T11:29:49Z',
  272                   'Expires': ec2utils.isotime()}
  273         self.assertRaises(exception.InvalidRequest,
  274                           ec2utils.is_ec2_timestamp_expired,
  275                           params)