"Fossies" - the Fresh Open Source Software Archive

Member "ec2-api-12.0.0/ec2api/tests/unit/test_security_group.py" (14 Apr 2021, 26693 Bytes) of package /linux/misc/openstack/ec2-api-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "test_security_group.py": 9.0.0_vs_10.0.0.

    1 # Copyright 2014
    2 # The Cloudscaling Group, Inc.
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License");
    5 # you may not use this file except in compliance with the License.
    6 # You may obtain a copy of the License at
    7 # http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   12 # See the License for the specific language governing permissions and
   13 # limitations under the License.
   14 
   15 
   16 import copy
   17 from unittest import mock
   18 
   19 from neutronclient.common import exceptions as neutron_exception
   20 
   21 from ec2api.tests.unit import base
   22 from ec2api.tests.unit import fakes
   23 from ec2api.tests.unit import matchers
   24 from ec2api.tests.unit import tools
   25 
   26 
   27 class SecurityGroupTestCase(base.ApiTestCase):
   28 
   29     def setUp(self):
   30         super(SecurityGroupTestCase, self).setUp()
   31 
   32     def test_create_security_group(self):
   33         self.set_mock_db_items(fakes.DB_VPC_1,
   34                                fakes.DB_SECURITY_GROUP_1)
   35         self.neutron.list_security_groups.return_value = (
   36             {'security_groups': [copy.deepcopy(fakes.OS_SECURITY_GROUP_1)]})
   37         self.db_api.add_item.return_value = fakes.DB_SECURITY_GROUP_2
   38         self.neutron.create_security_group.return_value = (
   39             {'security_group': copy.deepcopy(fakes.OS_SECURITY_GROUP_2)})
   40 
   41         resp = self.execute(
   42             'CreateSecurityGroup',
   43             {'GroupName': 'groupname',
   44              'GroupDescription': 'Group description'})
   45         secgroup_body = (
   46             {'security_group': {'name': 'groupname',
   47                                 'description': 'Group description'}})
   48         self.neutron.create_security_group.assert_called_once_with(
   49             secgroup_body)
   50         db_group = tools.purge_dict(fakes.DB_SECURITY_GROUP_2, ('id',))
   51         db_group['vpc_id'] = None
   52         self.db_api.add_item.assert_called_once_with(mock.ANY, 'sg', db_group)
   53         self.neutron.create_security_group.reset_mock()
   54         self.db_api.add_item.reset_mock()
   55 
   56         self.neutron.list_security_groups.return_value = (
   57             {'security_groups': [copy.deepcopy(fakes.OS_SECURITY_GROUP_1)]})
   58         resp = self.execute(
   59             'CreateSecurityGroup',
   60             {'VpcId': fakes.ID_EC2_VPC_1,
   61              'GroupName': 'groupname',
   62              'GroupDescription': 'Group description'})
   63         self.assertEqual(fakes.ID_EC2_SECURITY_GROUP_2, resp['groupId'])
   64         self.db_api.add_item.assert_called_once_with(
   65             mock.ANY, 'sg',
   66             tools.purge_dict(fakes.DB_SECURITY_GROUP_2, ('id',)))
   67         self.neutron.create_security_group.assert_called_once_with(
   68             secgroup_body)
   69         self.neutron.create_security_group.reset_mock()
   70         self.db_api.add_item.reset_mock()
   71 
   72         self.configure(disable_ec2_classic=True)
   73         self.add_mock_db_items(fakes.DB_VPC_DEFAULT,
   74                                fakes.DB_SECURITY_GROUP_DEFAULT)
   75         self.neutron.create_security_group.return_value = (
   76             {'security_group': copy.deepcopy(fakes.OS_SECURITY_GROUP_5)})
   77         self.neutron.list_security_groups.return_value = (
   78             {'security_groups': [copy.deepcopy(fakes.OS_SECURITY_GROUP_1),
   79                                  fakes.OS_SECURITY_GROUP_DEFAULT]})
   80         self.db_api.add_item.return_value = fakes.DB_SECURITY_GROUP_5
   81 
   82         resp = self.execute(
   83             'CreateSecurityGroup',
   84             {'GroupName': 'groupname2',
   85              'GroupDescription': 'Group description'})
   86         self.assertEqual(fakes.ID_EC2_SECURITY_GROUP_5, resp['groupId'])
   87         self.db_api.add_item.assert_called_once_with(
   88             mock.ANY, 'sg',
   89             tools.purge_dict(fakes.DB_SECURITY_GROUP_5, ('id',)))
   90         secgroup_body = (
   91             {'security_group': {'name': 'groupname2',
   92                                 'description': 'Group description'}})
   93         self.neutron.create_security_group.assert_called_once_with(
   94             secgroup_body)
   95 
   96     def test_create_security_group_invalid(self):
   97 
   98         def do_check(args, error_code):
   99             self.neutron.reset_mock()
  100             self.db_api.reset_mock()
  101             self.assert_execution_error(
  102                 error_code, 'CreateSecurityGroup', args)
  103 
  104         self.set_mock_db_items()
  105         do_check({'VpcId': fakes.ID_EC2_VPC_1,
  106                   'GroupName': 'groupname',
  107                   'GroupDescription': 'Group description'},
  108                  'InvalidVpcID.NotFound')
  109         self.db_api.get_item_by_id.assert_called_once_with(mock.ANY,
  110                                                            fakes.ID_EC2_VPC_1)
  111 
  112         do_check({'VpcId': fakes.ID_EC2_VPC_1,
  113                   'GroupName': 'aa #^% -=99',
  114                   'GroupDescription': 'Group description'},
  115                  'ValidationError')
  116 
  117         do_check({'VpcId': fakes.ID_EC2_VPC_1,
  118                   'GroupName': 'groupname',
  119                   'GroupDescription': 'aa #^% -=99'},
  120                  'ValidationError')
  121 
  122         do_check({'GroupName': 'aa \t\x01\x02\x7f',
  123                   'GroupDescription': 'Group description'},
  124                  'ValidationError')
  125 
  126         do_check({'GroupName': 'groupname',
  127                   'GroupDescription': 'aa \t\x01\x02\x7f'},
  128                  'ValidationError')
  129 
  130         do_check({'GroupName': 'x' * 256,
  131                   'GroupDescription': 'Group description'},
  132                  'ValidationError')
  133 
  134         do_check({'GroupName': 'groupname',
  135                   'GroupDescription': 'x' * 256},
  136                  'ValidationError')
  137 
  138         do_check({'GroupName': 'groupname'},
  139                  'MissingParameter')
  140 
  141         do_check({'GroupDescription': 'description'},
  142                  'MissingParameter')
  143 
  144         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_2, fakes.DB_VPC_1)
  145         self.neutron.list_security_groups.return_value = (
  146             {'security_groups': [fakes.OS_SECURITY_GROUP_2]})
  147         do_check({'VpcId': fakes.ID_EC2_VPC_1,
  148                   'GroupName': fakes.OS_SECURITY_GROUP_2['name'],
  149                   'GroupDescription': 'description'},
  150                  'InvalidGroup.Duplicate')
  151 
  152     def test_create_security_group_over_quota(self):
  153         self.neutron.create_security_group.side_effect = (
  154             neutron_exception.OverQuotaClient(413))
  155         self.assert_execution_error(
  156             'ResourceLimitExceeded', 'CreateSecurityGroup',
  157             {'VpcId': fakes.ID_EC2_VPC_1,
  158              'GroupName': 'groupname',
  159              'GroupDescription': 'Group description'})
  160         secgroup_body = (
  161             {'security_group': {'name': 'groupname',
  162                                 'description': 'Group description'}})
  163         self.neutron.create_security_group.assert_called_once_with(
  164             secgroup_body)
  165 
  166     @tools.screen_unexpected_exception_logs
  167     def test_create_security_group_rollback(self):
  168         self.set_mock_db_items(fakes.DB_VPC_1)
  169         self.db_api.add_item.side_effect = Exception()
  170         self.neutron.create_security_group.return_value = (
  171             {'security_group': copy.deepcopy(fakes.OS_SECURITY_GROUP_1)})
  172         self.assert_execution_error(
  173             self.ANY_EXECUTE_ERROR, 'CreateSecurityGroup',
  174             {'VpcId': fakes.ID_EC2_VPC_1,
  175              'GroupName': 'groupname',
  176              'GroupDescription': 'Group description'})
  177         self.neutron.delete_security_group.assert_called_once_with(
  178             fakes.ID_OS_SECURITY_GROUP_1)
  179 
  180     def test_delete_security_group(self):
  181         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1)
  182         resp = self.execute(
  183             'DeleteSecurityGroup',
  184             {'GroupId':
  185              fakes.ID_EC2_SECURITY_GROUP_1})
  186         self.assertEqual(True, resp['return'])
  187         self.db_api.get_item_by_id.assert_any_call(
  188             mock.ANY,
  189             fakes.ID_EC2_SECURITY_GROUP_1)
  190         self.db_api.delete_item.assert_called_once_with(
  191             mock.ANY,
  192             fakes.ID_EC2_SECURITY_GROUP_1)
  193         self.neutron.delete_security_group.assert_called_once_with(
  194             fakes.ID_OS_SECURITY_GROUP_1)
  195 
  196         self.db_api.delete_item.reset_mock()
  197         self.neutron.delete_security_group.reset_mock()
  198 
  199         self.configure(disable_ec2_classic=True)
  200         self.add_mock_db_items(fakes.DB_VPC_DEFAULT,
  201                                fakes.DB_SECURITY_GROUP_DEFAULT,
  202                                fakes.DB_SECURITY_GROUP_2,
  203                                fakes.DB_SECURITY_GROUP_6)
  204         self.neutron.list_security_groups.return_value = (
  205             {'security_groups': [copy.deepcopy(fakes.OS_SECURITY_GROUP_1),
  206                                  fakes.OS_SECURITY_GROUP_2,
  207                                  fakes.OS_SECURITY_GROUP_4,
  208                                  fakes.OS_SECURITY_GROUP_DEFAULT]})
  209         self.assert_execution_error(
  210             'InvalidGroup.NotFound', 'DeleteSecurityGroup',
  211             {'GroupName': 'groupname2'})
  212 
  213         self.db_api.delete_item.reset_mock()
  214         self.neutron.delete_security_group.reset_mock()
  215 
  216         self.add_mock_db_items(fakes.DB_SECURITY_GROUP_5)
  217         self.neutron.list_security_groups.return_value = (
  218             {'security_groups': [copy.deepcopy(fakes.OS_SECURITY_GROUP_1),
  219                                  fakes.OS_SECURITY_GROUP_2,
  220                                  fakes.OS_SECURITY_GROUP_4,
  221                                  fakes.OS_SECURITY_GROUP_5,
  222                                  fakes.OS_SECURITY_GROUP_DEFAULT]})
  223         resp = self.execute(
  224             'DeleteSecurityGroup',
  225             {'GroupName': 'groupname2'})
  226         self.assertEqual(True, resp['return'])
  227         self.db_api.get_item_by_id.assert_any_call(
  228             mock.ANY,
  229             fakes.ID_EC2_SECURITY_GROUP_5)
  230         self.db_api.delete_item.assert_called_with(
  231             mock.ANY,
  232             fakes.ID_EC2_SECURITY_GROUP_5)
  233         self.neutron.delete_security_group.assert_called_once_with(
  234             fakes.ID_OS_SECURITY_GROUP_5)
  235 
  236     # NOTE(Alex) This test is disabled because it checks using non-AWS id.
  237     @base.skip_not_implemented
  238     def test_delete_security_group_nova_os_id(self):
  239         self.nova.security_groups.list.return_value = (
  240             [fakes.OS_SECURITY_GROUP_1,
  241              fakes.OS_SECURITY_GROUP_2])
  242         resp = self.execute(
  243             'DeleteSecurityGroup',
  244             {'GroupId':
  245              fakes.ID_OS_SECURITY_GROUP_2})
  246         self.assertEqual(True, resp['return'])
  247         self.nova.security_groups.delete.assert_called_once_with(
  248             fakes.ID_OS_SECURITY_GROUP_2)
  249 
  250     def test_delete_security_group_invalid(self):
  251         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1, fakes.DB_VPC_1)
  252         self.neutron.show_security_group.return_value = (
  253             {'security_group': fakes.OS_SECURITY_GROUP_1})
  254         self.assert_execution_error(
  255             'CannotDelete', 'DeleteSecurityGroup',
  256             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_1})
  257         self.assert_execution_error(
  258             'InvalidGroup.NotFound', 'DeleteSecurityGroup',
  259             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_2})
  260         self.assertEqual(0, self.neutron.delete_port.call_count)
  261         self.assert_execution_error(
  262             'InvalidGroup.NotFound', 'DeleteSecurityGroup',
  263             {'GroupName': 'badname'})
  264         self.assertEqual(0, self.neutron.delete_port.call_count)
  265         self.assert_execution_error(
  266             'MissingParameter', 'DeleteSecurityGroup', {})
  267         self.assertEqual(0, self.neutron.delete_port.call_count)
  268 
  269     def test_delete_security_group_is_in_use(self):
  270         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1)
  271         self.neutron.delete_security_group.side_effect = (
  272             neutron_exception.Conflict())
  273         self.assert_execution_error(
  274             'DependencyViolation', 'DeleteSecurityGroup',
  275             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_1})
  276         self.assertEqual(0, self.db_api.delete_item.call_count)
  277 
  278     def test_describe_security_groups(self):
  279         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
  280                                fakes.DB_SECURITY_GROUP_2,
  281                                fakes.DB_SECURITY_GROUP_3,
  282                                fakes.DB_SECURITY_GROUP_4,
  283                                fakes.DB_SECURITY_GROUP_5,)
  284         self.neutron.list_security_groups.return_value = (
  285             {'security_groups': [copy.deepcopy(fakes.OS_SECURITY_GROUP_1),
  286                                  fakes.OS_SECURITY_GROUP_2,
  287                                  fakes.OS_SECURITY_GROUP_3,
  288                                  fakes.OS_SECURITY_GROUP_4,
  289                                  fakes.OS_SECURITY_GROUP_5]})
  290 
  291         resp = self.execute('DescribeSecurityGroups', {})
  292         self.assertThat(resp['securityGroupInfo'],
  293                         matchers.ListMatches(
  294                             [fakes.EC2_SECURITY_GROUP_1,
  295                              fakes.EC2_SECURITY_GROUP_2,
  296                              fakes.EC2_SECURITY_GROUP_3,
  297                              fakes.EC2_SECURITY_GROUP_4,
  298                              fakes.EC2_SECURITY_GROUP_5],
  299                             orderless_lists=True))
  300 
  301         resp = self.execute('DescribeSecurityGroups',
  302                             {'GroupName.1': 'groupname2'})
  303         self.assertThat(resp['securityGroupInfo'],
  304                         matchers.ListMatches(
  305                             [fakes.EC2_SECURITY_GROUP_4],
  306                             orderless_lists=True))
  307         self.assertEqual(0, self.db_api.delete_item.call_count)
  308 
  309         self.db_api.get_items_by_ids = tools.CopyingMock(
  310             return_value=[fakes.DB_SECURITY_GROUP_4])
  311 
  312         resp = self.execute('DescribeSecurityGroups',
  313                             {'GroupId.1': fakes.ID_EC2_SECURITY_GROUP_4})
  314         self.assertThat(resp['securityGroupInfo'],
  315                         matchers.ListMatches(
  316                             [fakes.EC2_SECURITY_GROUP_4],
  317                             orderless_lists=True))
  318         self.db_api.get_items_by_ids.assert_called_once_with(
  319             mock.ANY, set([fakes.ID_EC2_SECURITY_GROUP_4]))
  320         self.assertEqual(0, self.db_api.delete_item.call_count)
  321 
  322         self.check_filtering(
  323             'DescribeSecurityGroups', 'securityGroupInfo',
  324             [('vpc-id', fakes.ID_EC2_VPC_1),
  325              ('group-name', fakes.NAME_DEFAULT_OS_SECURITY_GROUP),
  326              ('group-id', fakes.ID_EC2_SECURITY_GROUP_1),
  327              ('description', fakes.EC2_SECURITY_GROUP_1['groupDescription']),
  328              ('ip-permission.protocol', 'tcp'),
  329              ('ip-permission.to-port', 10),
  330              ('ip-permission.from-port', 10),
  331              ('ip-permission.cidr', '192.168.1.0/24'),
  332              # TODO(andrey-mp): declare this data in fakes
  333              # ('ip-permission.group-id', fakes.ID_EC2_SECURITY_GROUP_1),
  334              # ('ip-permission.group-name', 'default'),
  335              # ('ip-permission.user-id', fakes.ID_OS_PROJECT),
  336              ('owner-id', fakes.ID_OS_PROJECT)])
  337         self.check_tag_support(
  338             'DescribeSecurityGroups', 'securityGroupInfo',
  339             fakes.ID_EC2_SECURITY_GROUP_4, 'groupId')
  340 
  341         self.configure(disable_ec2_classic=True)
  342         self.add_mock_db_items(fakes.DB_VPC_DEFAULT,
  343                                fakes.DB_SECURITY_GROUP_6)
  344         resp = self.execute('DescribeSecurityGroups',
  345                             {'GroupName.1': 'groupname2'})
  346         self.assertThat(resp['securityGroupInfo'],
  347                         matchers.ListMatches(
  348                             [fakes.EC2_SECURITY_GROUP_5],
  349                             orderless_lists=True))
  350 
  351     @mock.patch('ec2api.api.ec2utils.check_and_create_default_vpc')
  352     def test_describe_security_groups_no_default_vpc(self, check_and_create):
  353         self.configure(disable_ec2_classic=True)
  354 
  355         def mock_check_and_create(context):
  356             self.set_mock_db_items(fakes.DB_VPC_DEFAULT,
  357                                    fakes.DB_SECURITY_GROUP_DEFAULT)
  358             self.neutron.list_security_groups.return_value = (
  359                 {'security_groups': [fakes.OS_SECURITY_GROUP_DEFAULT]})
  360         check_and_create.side_effect = mock_check_and_create
  361 
  362         resp = self.execute('DescribeSecurityGroups', {})
  363         self.assertThat(resp['securityGroupInfo'],
  364                         matchers.ListMatches([
  365                             fakes.EC2_SECURITY_GROUP_DEFAULT],
  366                         orderless_lists=True))
  367 
  368         check_and_create.assert_called_once_with(mock.ANY)
  369 
  370     def test_repair_default_security_group(self):
  371         self.db_api.add_item.return_value = fakes.DB_SECURITY_GROUP_1
  372         self.neutron.create_security_group.return_value = (
  373             {'security_group': copy.deepcopy(fakes.OS_SECURITY_GROUP_1)})
  374         self.set_mock_db_items(fakes.DB_VPC_1,
  375                                fakes.DB_SECURITY_GROUP_1,
  376                                fakes.DB_SECURITY_GROUP_2)
  377         self.neutron.list_security_groups.return_value = (
  378             {'security_groups': [fakes.OS_SECURITY_GROUP_2]})
  379 
  380         resp = self.execute('DescribeSecurityGroups', {})
  381         self.db_api.restore_item.assert_called_once_with(
  382             mock.ANY, 'sg',
  383             {'id': fakes.ID_EC2_VPC_1.replace('vpc', 'sg'),
  384              'os_id': fakes.ID_OS_SECURITY_GROUP_1,
  385              'vpc_id': fakes.ID_EC2_VPC_1})
  386         secgroup_body = (
  387             {'security_group': {'name': fakes.ID_EC2_VPC_1,
  388                                 'description': 'Default VPC security group'}})
  389         self.neutron.create_security_group.assert_called_once_with(
  390             secgroup_body)
  391 
  392     def test_authorize_security_group_invalid(self):
  393 
  394         def check_response(error_code, protocol, from_port, to_port, cidr,
  395                            group_id=fakes.ID_EC2_SECURITY_GROUP_2):
  396             params = {'IpPermissions.1.FromPort': str(from_port),
  397                       'IpPermissions.1.ToPort': str(to_port),
  398                       'IpPermissions.1.IpProtocol': protocol}
  399             if group_id is not None:
  400                 params['GroupId'] = group_id
  401             if cidr is not None:
  402                 params['IpPermissions.1.IpRanges.1.CidrIp'] = cidr
  403             self.assert_execution_error(
  404                 error_code, 'AuthorizeSecurityGroupIngress', params)
  405             self.neutron.reset_mock()
  406             self.db_api.reset_mock()
  407 
  408         self.execute(
  409             'AuthorizeSecurityGroupIngress',
  410             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
  411              'IpPermissions.1.FromPort': '-1',
  412              'IpPermissions.1.ToPort': '-1',
  413              'IpPermissions.1.IpProtocol': 'icmp',
  414              'IpPermissions.1.IpRanges.1.CidrIp': '0.0.0.0/0'})
  415         # Duplicate rule
  416         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
  417                                fakes.DB_SECURITY_GROUP_2)
  418         self.neutron.create_security_group_rule.side_effect = (
  419             neutron_exception.Conflict)
  420         check_response('InvalidPermission.Duplicate', 'icmp',
  421                        -1, -1, '0.0.0.0/0')
  422         # Over quota
  423         self.neutron.create_security_group_rule.side_effect = (
  424             neutron_exception.OverQuotaClient)
  425         check_response('RulesPerSecurityGroupLimitExceeded', 'icmp', -1, -1,
  426                        '0.0.0.0/0')
  427         # Invalid CIDR address
  428         check_response('InvalidParameterValue', 'tcp', 80, 81, '0.0.0.0/0444')
  429         # Missing ports
  430         check_response('InvalidParameterValue', 'tcp', -1, -1, '0.0.0.0/0')
  431         # from port cannot be greater than to port
  432         check_response('InvalidParameterValue', 'tcp', 100, 1, '0.0.0.0/0')
  433         # For tcp, negative values are not allowed
  434         check_response('InvalidParameterValue', 'tcp', -1, 1, '0.0.0.0/0')
  435         # For tcp, valid port range 1-65535
  436         check_response('InvalidParameterValue', 'tcp', 1, 65599, '0.0.0.0/0')
  437         # Invalid protocol
  438         check_response('InvalidParameterValue', 'xyz', 1, 14, '0.0.0.0/0')
  439         # Invalid port
  440         check_response('InvalidParameterValue', 'tcp', " ", "gg", '0.0.0.0/0')
  441         # Invalid icmp port
  442         check_response('InvalidParameterValue', 'icmp', " ", "gg", '0.0.0.0/0')
  443         # Invalid CIDR Address
  444         check_response('InvalidParameterValue', 'icmp', -1, -1, '0.0.0.0')
  445         # Invalid CIDR Address
  446         check_response('InvalidParameterValue', 'icmp', 5, 10, '0.0.0.0/')
  447         # Invalid Cidr ports
  448         check_response('InvalidParameterValue', 'icmp', 1, 256, '0.0.0.0/0')
  449         # Missing group
  450         check_response('MissingParameter', 'tcp', 1, 255, '0.0.0.0/0', None)
  451         # Missing cidr
  452         check_response('MissingParameter', 'tcp', 1, 255, None)
  453         # Invalid remote group
  454         self.assert_execution_error(
  455             'InvalidGroup.NotFound', 'AuthorizeSecurityGroupIngress',
  456             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
  457              'IpPermissions.1.IpProtocol': 'icmp',
  458              'IpPermissions.1.Groups.1.GroupName': 'somegroup',
  459              'IpPermissions.1.Groups.1.UserId': 'i-99999999'})
  460 
  461     def test_authorize_security_group_ingress_ip_ranges(self):
  462         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
  463                                fakes.DB_SECURITY_GROUP_2)
  464         self.neutron.create_security_group_rule.return_value = (
  465             {'security_group_rule': [fakes.OS_SECURITY_GROUP_RULE_1]})
  466         self.execute(
  467             'AuthorizeSecurityGroupIngress',
  468             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
  469              'IpPermissions.1.FromPort': '10',
  470              'IpPermissions.1.ToPort': '10',
  471              'IpPermissions.1.IpProtocol': 'tcp',
  472              'IpPermissions.1.IpRanges.1.CidrIp': '192.168.1.0/24'})
  473         self.neutron.create_security_group_rule.assert_called_once_with(
  474             {'security_group_rule':
  475              tools.purge_dict(fakes.OS_SECURITY_GROUP_RULE_1,
  476                               {'id', 'remote_group_id', 'tenant_id'})})
  477         # NOTE(Alex): Openstack extension, AWS-incompability
  478         # IPv6 is not supported by Amazon.
  479         self.execute(
  480             'AuthorizeSecurityGroupIngress',
  481             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
  482              'IpPermissions.1.FromPort': '10',
  483              'IpPermissions.1.ToPort': '10',
  484              'IpPermissions.1.IpProtocol': 'tcp',
  485              'IpPermissions.1.IpRanges.1.CidrIp': '::/0'})
  486         self.neutron.create_security_group_rule.assert_called_with(
  487             {'security_group_rule':
  488              tools.patch_dict(
  489                  fakes.OS_SECURITY_GROUP_RULE_1, {'remote_ip_prefix': '::/0'},
  490                  {'id', 'remote_group_id', 'tenant_id'})})
  491 
  492         self.configure(disable_ec2_classic=True)
  493         self.add_mock_db_items(fakes.DB_VPC_DEFAULT,
  494                                fakes.DB_SECURITY_GROUP_4,
  495                                fakes.DB_SECURITY_GROUP_5,
  496                                fakes.DB_SECURITY_GROUP_6)
  497         self.neutron.list_security_groups.return_value = (
  498             {'security_groups': [fakes.OS_SECURITY_GROUP_4,
  499                                  fakes.OS_SECURITY_GROUP_5]})
  500 
  501         self.execute(
  502             'AuthorizeSecurityGroupIngress',
  503             {'GroupName': 'groupname2',
  504              'IpPermissions.1.FromPort': '10',
  505              'IpPermissions.1.ToPort': '10',
  506              'IpPermissions.1.IpProtocol': 'tcp',
  507              'IpPermissions.1.IpRanges.1.CidrIp': '::/0'})
  508         security_group_rule = {
  509             'direction': 'ingress',
  510             'ethertype': 'IPv4',
  511             'port_range_min': 10,
  512             'port_range_max': 10,
  513             'protocol': 'tcp',
  514             'remote_ip_prefix': '::/0',
  515             'security_group_id': fakes.ID_OS_SECURITY_GROUP_5}
  516         self.neutron.create_security_group_rule.assert_called_with(
  517             {'security_group_rule': security_group_rule})
  518 
  519     def test_authorize_security_group_egress_groups(self):
  520         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
  521                                fakes.DB_SECURITY_GROUP_2)
  522         self.neutron.create_security_group_rule.return_value = (
  523             {'security_group_rule': [fakes.OS_SECURITY_GROUP_RULE_1]})
  524         self.execute(
  525             'AuthorizeSecurityGroupEgress',
  526             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
  527              'IpPermissions.1.FromPort': '10',
  528              'IpPermissions.1.IpProtocol': '100',
  529              'IpPermissions.1.Groups.1.GroupId':
  530              fakes.ID_EC2_SECURITY_GROUP_1})
  531         self.neutron.create_security_group_rule.assert_called_once_with(
  532             {'security_group_rule':
  533              tools.purge_dict(fakes.OS_SECURITY_GROUP_RULE_2,
  534                               {'id', 'remote_ip_prefix', 'tenant_id',
  535                                'port_range_max'})})
  536 
  537     def test_revoke_security_group_ingress_ip_ranges(self):
  538         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
  539                                fakes.DB_SECURITY_GROUP_2)
  540         self.neutron.show_security_group.return_value = {
  541             'security_group': fakes.OS_SECURITY_GROUP_2}
  542         self.neutron.delete_security_group_rule.return_value = True
  543         self.execute(
  544             'RevokeSecurityGroupIngress',
  545             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
  546              'IpPermissions.1.FromPort': '10',
  547              'IpPermissions.1.ToPort': '10',
  548              'IpPermissions.1.IpProtocol': 'tcp',
  549              'IpPermissions.1.IpRanges.1.CidrIp': '192.168.1.0/24'})
  550         self.neutron.show_security_group.assert_called_once_with(
  551             fakes.ID_OS_SECURITY_GROUP_2)
  552         self.neutron.delete_security_group_rule.assert_called_once_with(
  553             fakes.OS_SECURITY_GROUP_RULE_1['id'])
  554 
  555     def test_revoke_security_group_egress_groups(self):
  556         self.set_mock_db_items(fakes.DB_SECURITY_GROUP_1,
  557                                fakes.DB_SECURITY_GROUP_2)
  558         self.neutron.show_security_group.return_value = {
  559             'security_group': fakes.OS_SECURITY_GROUP_2}
  560         self.neutron.delete_security_group_rule.return_value = True
  561         self.execute(
  562             'RevokeSecurityGroupEgress',
  563             {'GroupId': fakes.ID_EC2_SECURITY_GROUP_2,
  564              'IpPermissions.1.FromPort': '10',
  565              'IpPermissions.1.IpProtocol': '100',
  566              'IpPermissions.1.Groups.1.GroupId':
  567              fakes.ID_EC2_SECURITY_GROUP_1})
  568         self.neutron.show_security_group.assert_called_once_with(
  569             fakes.ID_OS_SECURITY_GROUP_2)
  570         self.neutron.delete_security_group_rule.assert_called_once_with(
  571             fakes.OS_SECURITY_GROUP_RULE_2['id'])