"Fossies" - the Fresh Open Source Software Archive

Member "octavia-8.0.0/octavia/tests/unit/api/drivers/amphora_driver/v1/test_driver.py" (14 Apr 2021, 37491 Bytes) of package /linux/misc/openstack/octavia-8.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 #    Copyright 2018 Rackspace, US Inc.
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 #    not use this file except in compliance with the License. You may obtain
    5 #    a copy of the License at
    6 #
    7 #         http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 #    License for the specific language governing permissions and limitations
   13 #    under the License.
   14 from unittest import mock
   15 
   16 from octavia_lib.api.drivers import data_models as driver_dm
   17 from octavia_lib.api.drivers import exceptions
   18 from oslo_utils import uuidutils
   19 
   20 from octavia.api.drivers.amphora_driver.v1 import driver
   21 from octavia.common import constants as consts
   22 from octavia.network import base as network_base
   23 from octavia.tests.common import sample_data_models
   24 from octavia.tests.unit import base
   25 
   26 
   27 class TestAmphoraDriver(base.TestRpc):
   28     def setUp(self):
   29         super().setUp()
   30         self.amp_driver = driver.AmphoraProviderDriver()
   31         self.sample_data = sample_data_models.SampleDriverDataModels()
   32 
   33     @mock.patch('octavia.common.utils.get_network_driver')
   34     def test_create_vip_port(self, mock_get_net_driver):
   35         mock_net_driver = mock.MagicMock()
   36         mock_get_net_driver.return_value = mock_net_driver
   37         mock_net_driver.allocate_vip.return_value = self.sample_data.db_vip
   38 
   39         provider_vip_dict = self.amp_driver.create_vip_port(
   40             self.sample_data.lb_id, self.sample_data.project_id,
   41             self.sample_data.provider_vip_dict)
   42 
   43         self.assertEqual(self.sample_data.provider_vip_dict, provider_vip_dict)
   44 
   45     @mock.patch('octavia.common.utils.get_network_driver')
   46     def test_create_vip_port_without_port_security_enabled(
   47             self, mock_get_net_driver):
   48         mock_net_driver = mock.MagicMock()
   49         mock_get_net_driver.return_value = mock_net_driver
   50         network = mock.MagicMock()
   51         network.port_security_enabled = False
   52         mock_net_driver.get_network.return_value = network
   53         mock_net_driver.allocate_vip.return_value = self.sample_data.db_vip
   54 
   55         self.assertRaises(exceptions.DriverError,
   56                           self.amp_driver.create_vip_port,
   57                           self.sample_data.lb_id, self.sample_data.project_id,
   58                           self.sample_data.provider_vip_dict)
   59 
   60     @mock.patch('octavia.common.utils.get_network_driver')
   61     def test_create_vip_port_failed(self, mock_get_net_driver):
   62         mock_net_driver = mock.MagicMock()
   63         mock_get_net_driver.return_value = mock_net_driver
   64         mock_net_driver.allocate_vip.side_effect = (
   65             network_base.AllocateVIPException())
   66 
   67         self.assertRaises(exceptions.DriverError,
   68                           self.amp_driver.create_vip_port,
   69                           self.sample_data.lb_id, self.sample_data.project_id,
   70                           self.sample_data.provider_vip_dict)
   71 
   72     # Load Balancer
   73     @mock.patch('oslo_messaging.RPCClient.cast')
   74     def test_loadbalancer_create(self, mock_cast):
   75         provider_lb = driver_dm.LoadBalancer(
   76             loadbalancer_id=self.sample_data.lb_id)
   77         self.amp_driver.loadbalancer_create(provider_lb)
   78         payload = {consts.LOAD_BALANCER_ID: self.sample_data.lb_id,
   79                    consts.FLAVOR: None,
   80                    consts.AVAILABILITY_ZONE: None}
   81         mock_cast.assert_called_with({}, 'create_load_balancer', **payload)
   82 
   83     @mock.patch('oslo_messaging.RPCClient.cast')
   84     def test_loadbalancer_delete(self, mock_cast):
   85         provider_lb = driver_dm.LoadBalancer(
   86             loadbalancer_id=self.sample_data.lb_id)
   87         self.amp_driver.loadbalancer_delete(provider_lb)
   88         payload = {consts.LOAD_BALANCER_ID: self.sample_data.lb_id,
   89                    'cascade': False}
   90         mock_cast.assert_called_with({}, 'delete_load_balancer', **payload)
   91 
   92     @mock.patch('oslo_messaging.RPCClient.cast')
   93     def test_loadbalancer_failover(self, mock_cast):
   94         self.amp_driver.loadbalancer_failover(self.sample_data.lb_id)
   95         payload = {consts.LOAD_BALANCER_ID: self.sample_data.lb_id}
   96         mock_cast.assert_called_with({}, 'failover_load_balancer', **payload)
   97 
   98     @mock.patch('oslo_messaging.RPCClient.cast')
   99     def test_loadbalancer_update(self, mock_cast):
  100         old_provider_lb = driver_dm.LoadBalancer(
  101             loadbalancer_id=self.sample_data.lb_id)
  102         provider_lb = driver_dm.LoadBalancer(
  103             loadbalancer_id=self.sample_data.lb_id, admin_state_up=True)
  104         lb_dict = {'enabled': True}
  105         self.amp_driver.loadbalancer_update(old_provider_lb, provider_lb)
  106         payload = {consts.LOAD_BALANCER_ID: self.sample_data.lb_id,
  107                    consts.LOAD_BALANCER_UPDATES: lb_dict}
  108         mock_cast.assert_called_with({}, 'update_load_balancer', **payload)
  109 
  110     @mock.patch('oslo_messaging.RPCClient.cast')
  111     def test_loadbalancer_update_name(self, mock_cast):
  112         old_provider_lb = driver_dm.LoadBalancer(
  113             loadbalancer_id=self.sample_data.lb_id)
  114         provider_lb = driver_dm.LoadBalancer(
  115             loadbalancer_id=self.sample_data.lb_id, name='Great LB')
  116         lb_dict = {'name': 'Great LB'}
  117         self.amp_driver.loadbalancer_update(old_provider_lb, provider_lb)
  118         payload = {consts.LOAD_BALANCER_ID: self.sample_data.lb_id,
  119                    consts.LOAD_BALANCER_UPDATES: lb_dict}
  120         mock_cast.assert_called_with({}, 'update_load_balancer', **payload)
  121 
  122     @mock.patch('oslo_messaging.RPCClient.cast')
  123     def test_loadbalancer_update_qos(self, mock_cast):
  124         qos_policy_id = uuidutils.generate_uuid()
  125         old_provider_lb = driver_dm.LoadBalancer(
  126             loadbalancer_id=self.sample_data.lb_id)
  127         provider_lb = driver_dm.LoadBalancer(
  128             loadbalancer_id=self.sample_data.lb_id,
  129             vip_qos_policy_id=qos_policy_id)
  130         lb_dict = {'vip': {'qos_policy_id': qos_policy_id}}
  131         self.amp_driver.loadbalancer_update(old_provider_lb, provider_lb)
  132         payload = {consts.LOAD_BALANCER_ID: self.sample_data.lb_id,
  133                    consts.LOAD_BALANCER_UPDATES: lb_dict}
  134         mock_cast.assert_called_with({}, 'update_load_balancer', **payload)
  135 
  136     # Listener
  137     @mock.patch('oslo_messaging.RPCClient.cast')
  138     def test_listener_create(self, mock_cast):
  139         provider_listener = driver_dm.Listener(
  140             listener_id=self.sample_data.listener1_id,
  141             protocol=consts.PROTOCOL_HTTPS,
  142             alpn_protocols=consts.AMPHORA_SUPPORTED_ALPN_PROTOCOLS)
  143         self.amp_driver.listener_create(provider_listener)
  144         payload = {consts.LISTENER_ID: self.sample_data.listener1_id}
  145         mock_cast.assert_called_with({}, 'create_listener', **payload)
  146 
  147     @mock.patch('oslo_messaging.RPCClient.cast')
  148     def test_listener_create_unsupported_alpn(self, mock_cast):
  149         provider_listener = driver_dm.Listener(
  150             listener_id=self.sample_data.listener1_id,
  151             protocol=consts.PROTOCOL_HTTPS)
  152         provider_listener.alpn_protocols = ['http/1.1', 'eureka']
  153         self.assertRaises(
  154             exceptions.UnsupportedOptionError,
  155             self.amp_driver.listener_create,
  156             provider_listener)
  157         mock_cast.assert_not_called()
  158 
  159     @mock.patch('oslo_messaging.RPCClient.cast')
  160     def test_listener_create_unsupported_protocol(self, mock_cast):
  161         provider_listener = driver_dm.Listener(
  162             listener_id=self.sample_data.listener1_id,
  163             protocol='UNSUPPORTED_PROTO')
  164         self.assertRaises(
  165             exceptions.UnsupportedOptionError,
  166             self.amp_driver.listener_create,
  167             provider_listener)
  168         mock_cast.assert_not_called()
  169 
  170     @mock.patch('oslo_messaging.RPCClient.cast')
  171     def test_listener_delete(self, mock_cast):
  172         provider_listener = driver_dm.Listener(
  173             listener_id=self.sample_data.listener1_id)
  174         self.amp_driver.listener_delete(provider_listener)
  175         payload = {consts.LISTENER_ID: self.sample_data.listener1_id}
  176         mock_cast.assert_called_with({}, 'delete_listener', **payload)
  177 
  178     @mock.patch('oslo_messaging.RPCClient.cast')
  179     def test_listener_update(self, mock_cast):
  180         old_provider_listener = driver_dm.Listener(
  181             listener_id=self.sample_data.listener1_id)
  182         provider_listener = driver_dm.Listener(
  183             listener_id=self.sample_data.listener1_id, admin_state_up=False)
  184         listener_dict = {'enabled': False}
  185         self.amp_driver.listener_update(old_provider_listener,
  186                                         provider_listener)
  187         payload = {consts.LISTENER_ID: self.sample_data.listener1_id,
  188                    consts.LISTENER_UPDATES: listener_dict}
  189         mock_cast.assert_called_with({}, 'update_listener', **payload)
  190 
  191     @mock.patch('oslo_messaging.RPCClient.cast')
  192     def test_listener_update_name(self, mock_cast):
  193         old_provider_listener = driver_dm.Listener(
  194             listener_id=self.sample_data.listener1_id)
  195         provider_listener = driver_dm.Listener(
  196             listener_id=self.sample_data.listener1_id, name='Great Listener')
  197         listener_dict = {'name': 'Great Listener'}
  198         self.amp_driver.listener_update(old_provider_listener,
  199                                         provider_listener)
  200         payload = {consts.LISTENER_ID: self.sample_data.listener1_id,
  201                    consts.LISTENER_UPDATES: listener_dict}
  202         mock_cast.assert_called_with({}, 'update_listener', **payload)
  203 
  204     @mock.patch('oslo_messaging.RPCClient.cast')
  205     def test_listener_update_unsupported_alpn(self, mock_cast):
  206         old_provider_listener = driver_dm.Listener(
  207             listener_id=self.sample_data.listener1_id)
  208         provider_listener = driver_dm.Listener(
  209             listener_id=self.sample_data.listener1_id,
  210             alpn_protocols=['http/1.1', 'eureka'])
  211         self.assertRaises(
  212             exceptions.UnsupportedOptionError,
  213             self.amp_driver.listener_update,
  214             old_provider_listener,
  215             provider_listener)
  216 
  217     # Pool
  218     @mock.patch('oslo_messaging.RPCClient.cast')
  219     def test_pool_create(self, mock_cast):
  220         provider_pool = driver_dm.Pool(
  221             pool_id=self.sample_data.pool1_id,
  222             lb_algorithm=consts.LB_ALGORITHM_ROUND_ROBIN,
  223             alpn_protocols=consts.AMPHORA_SUPPORTED_ALPN_PROTOCOLS)
  224         self.amp_driver.pool_create(provider_pool)
  225         payload = {consts.POOL_ID: self.sample_data.pool1_id}
  226         mock_cast.assert_called_with({}, 'create_pool', **payload)
  227 
  228     @mock.patch('oslo_messaging.RPCClient.cast')
  229     def test_pool_create_unsupported_algorithm(self, mock_cast):
  230         provider_pool = driver_dm.Pool(
  231             pool_id=self.sample_data.pool1_id)
  232         provider_pool.lb_algorithm = 'foo'
  233         self.assertRaises(
  234             exceptions.UnsupportedOptionError,
  235             self.amp_driver.pool_create,
  236             provider_pool)
  237         mock_cast.assert_not_called()
  238 
  239     @mock.patch('oslo_messaging.RPCClient.cast')
  240     def test_pool_create_unsupported_alpn(self, mock_cast):
  241         provider_pool = driver_dm.Pool(pool_id=self.sample_data.pool1_id)
  242         provider_pool.alpn_protocols = ['http/1.1', 'eureka']
  243         self.assertRaises(
  244             exceptions.UnsupportedOptionError,
  245             self.amp_driver.pool_create,
  246             provider_pool)
  247         mock_cast.assert_not_called()
  248 
  249     @mock.patch('oslo_messaging.RPCClient.cast')
  250     def test_pool_delete(self, mock_cast):
  251         provider_pool = driver_dm.Pool(
  252             pool_id=self.sample_data.pool1_id)
  253         self.amp_driver.pool_delete(provider_pool)
  254         payload = {consts.POOL_ID: self.sample_data.pool1_id}
  255         mock_cast.assert_called_with({}, 'delete_pool', **payload)
  256 
  257     @mock.patch('oslo_messaging.RPCClient.cast')
  258     def test_pool_update(self, mock_cast):
  259         old_provider_pool = driver_dm.Pool(
  260             pool_id=self.sample_data.pool1_id)
  261         provider_pool = driver_dm.Pool(
  262             pool_id=self.sample_data.pool1_id, admin_state_up=True,
  263             ca_tls_container_data='CA DATA', ca_tls_container_ref='CA REF',
  264             crl_container_data='CRL DATA', crl_container_ref='CRL REF',
  265             description='TEST DESCRIPTION', name='TEST NAME',
  266             lb_algorithm=consts.LB_ALGORITHM_SOURCE_IP,
  267             session_persistence='FAKE SP', tls_container_data='TLS DATA',
  268             tls_container_ref='TLS REF', tls_enabled=False)
  269         pool_dict = {'description': 'TEST DESCRIPTION',
  270                      'lb_algorithm': 'SOURCE_IP', 'name': 'TEST NAME',
  271                      'session_persistence': 'FAKE SP', 'tls_enabled': False,
  272                      'enabled': True, 'tls_certificate_id': 'TLS REF',
  273                      'ca_tls_certificate_id': 'CA REF',
  274                      'crl_container_id': 'CRL REF'}
  275         self.amp_driver.pool_update(old_provider_pool, provider_pool)
  276         payload = {consts.POOL_ID: self.sample_data.pool1_id,
  277                    consts.POOL_UPDATES: pool_dict}
  278         mock_cast.assert_called_with({}, 'update_pool', **payload)
  279 
  280     @mock.patch('oslo_messaging.RPCClient.cast')
  281     def test_pool_update_name(self, mock_cast):
  282         old_provider_pool = driver_dm.Pool(
  283             pool_id=self.sample_data.pool1_id)
  284         provider_pool = driver_dm.Pool(
  285             pool_id=self.sample_data.pool1_id, name='Great pool',
  286             admin_state_up=True, tls_enabled=True)
  287         pool_dict = {'name': 'Great pool',
  288                      'enabled': True,
  289                      'tls_enabled': True}
  290         self.amp_driver.pool_update(old_provider_pool, provider_pool)
  291         payload = {consts.POOL_ID: self.sample_data.pool1_id,
  292                    consts.POOL_UPDATES: pool_dict}
  293         mock_cast.assert_called_with({}, 'update_pool', **payload)
  294 
  295     @mock.patch('oslo_messaging.RPCClient.cast')
  296     def test_pool_update_unsupported_algorithm(self, mock_cast):
  297         old_provider_pool = driver_dm.Pool(
  298             pool_id=self.sample_data.pool1_id)
  299         provider_pool = driver_dm.Pool(
  300             pool_id=self.sample_data.pool1_id)
  301         provider_pool.lb_algorithm = 'foo'
  302         self.assertRaises(
  303             exceptions.UnsupportedOptionError,
  304             self.amp_driver.pool_update,
  305             old_provider_pool,
  306             provider_pool)
  307         mock_cast.assert_not_called()
  308 
  309     @mock.patch('oslo_messaging.RPCClient.cast')
  310     def test_pool_update_unsupported_alpn(self, mock_cast):
  311         old_provider_pool = driver_dm.Pool(pool_id=self.sample_data.pool1_id)
  312         provider_pool = driver_dm.Pool(
  313             listener_id=self.sample_data.pool1_id,
  314             alpn_protocols=['http/1.1', 'eureka'])
  315         self.assertRaises(
  316             exceptions.UnsupportedOptionError,
  317             self.amp_driver.pool_update,
  318             old_provider_pool,
  319             provider_pool)
  320 
  321     # Member
  322     @mock.patch('octavia.db.api.get_session')
  323     @mock.patch('octavia.db.repositories.PoolRepository.get')
  324     @mock.patch('oslo_messaging.RPCClient.cast')
  325     def test_member_create(self, mock_cast, mock_pool_get, mock_session):
  326         provider_member = driver_dm.Member(
  327             member_id=self.sample_data.member1_id)
  328         self.amp_driver.member_create(provider_member)
  329         payload = {consts.MEMBER_ID: self.sample_data.member1_id}
  330         mock_cast.assert_called_with({}, 'create_member', **payload)
  331 
  332     @mock.patch('octavia.db.api.get_session')
  333     @mock.patch('octavia.db.repositories.PoolRepository.get')
  334     @mock.patch('oslo_messaging.RPCClient.cast')
  335     def test_member_create_udp_ipv4(self, mock_cast, mock_pool_get,
  336                                     mock_session):
  337         mock_lb = mock.MagicMock()
  338         mock_lb.vip = mock.MagicMock()
  339         mock_lb.vip.ip_address = "192.0.1.1"
  340         mock_listener = mock.MagicMock()
  341         mock_listener.load_balancer = mock_lb
  342         mock_pool = mock.MagicMock()
  343         mock_pool.protocol = consts.PROTOCOL_UDP
  344         mock_pool.listeners = [mock_listener]
  345         mock_pool_get.return_value = mock_pool
  346 
  347         provider_member = driver_dm.Member(
  348             member_id=self.sample_data.member1_id,
  349             address="192.0.2.1")
  350         self.amp_driver.member_create(provider_member)
  351         payload = {consts.MEMBER_ID: self.sample_data.member1_id}
  352         mock_cast.assert_called_with({}, 'create_member', **payload)
  353 
  354     @mock.patch('octavia.db.api.get_session')
  355     @mock.patch('octavia.db.repositories.PoolRepository.get')
  356     @mock.patch('oslo_messaging.RPCClient.cast')
  357     def test_member_create_udp_ipv4_ipv6(self, mock_cast, mock_pool_get,
  358                                          mock_session):
  359         mock_lb = mock.MagicMock()
  360         mock_lb.vip = mock.MagicMock()
  361         mock_lb.vip.ip_address = "fe80::1"
  362         mock_listener = mock.MagicMock()
  363         mock_listener.load_balancer = mock_lb
  364         mock_pool = mock.MagicMock()
  365         mock_pool.protocol = consts.PROTOCOL_UDP
  366         mock_pool.listeners = [mock_listener]
  367         mock_pool_get.return_value = mock_pool
  368 
  369         provider_member = driver_dm.Member(
  370             member_id=self.sample_data.member1_id,
  371             address="192.0.2.1")
  372         self.assertRaises(exceptions.UnsupportedOptionError,
  373                           self.amp_driver.member_create,
  374                           provider_member)
  375 
  376     @mock.patch('oslo_messaging.RPCClient.cast')
  377     def test_member_delete(self, mock_cast):
  378         provider_member = driver_dm.Member(
  379             member_id=self.sample_data.member1_id)
  380         self.amp_driver.member_delete(provider_member)
  381         payload = {consts.MEMBER_ID: self.sample_data.member1_id}
  382         mock_cast.assert_called_with({}, 'delete_member', **payload)
  383 
  384     @mock.patch('oslo_messaging.RPCClient.cast')
  385     def test_member_update(self, mock_cast):
  386         old_provider_member = driver_dm.Member(
  387             member_id=self.sample_data.member1_id)
  388         provider_member = driver_dm.Member(
  389             member_id=self.sample_data.member1_id, admin_state_up=True)
  390         member_dict = {'enabled': True}
  391         self.amp_driver.member_update(old_provider_member, provider_member)
  392         payload = {consts.MEMBER_ID: self.sample_data.member1_id,
  393                    consts.MEMBER_UPDATES: member_dict}
  394         mock_cast.assert_called_with({}, 'update_member', **payload)
  395 
  396     @mock.patch('oslo_messaging.RPCClient.cast')
  397     def test_member_update_name(self, mock_cast):
  398         old_provider_member = driver_dm.Member(
  399             member_id=self.sample_data.member1_id)
  400         provider_member = driver_dm.Member(
  401             member_id=self.sample_data.member1_id, name='Great member')
  402         member_dict = {'name': 'Great member'}
  403         self.amp_driver.member_update(old_provider_member, provider_member)
  404         payload = {consts.MEMBER_ID: self.sample_data.member1_id,
  405                    consts.MEMBER_UPDATES: member_dict}
  406         mock_cast.assert_called_with({}, 'update_member', **payload)
  407 
  408     @mock.patch('octavia.db.api.get_session')
  409     @mock.patch('octavia.db.repositories.PoolRepository.get')
  410     @mock.patch('oslo_messaging.RPCClient.cast')
  411     def test_member_batch_update(self, mock_cast, mock_pool_get, mock_session):
  412         mock_pool = mock.MagicMock()
  413         mock_pool.members = self.sample_data.db_pool1_members
  414         mock_pool_get.return_value = mock_pool
  415 
  416         prov_mem_update = driver_dm.Member(
  417             member_id=self.sample_data.member2_id,
  418             pool_id=self.sample_data.pool1_id, admin_state_up=False,
  419             address='192.0.2.17', monitor_address='192.0.2.77',
  420             protocol_port=80, name='updated-member2')
  421         prov_new_member = driver_dm.Member(
  422             member_id=self.sample_data.member3_id,
  423             pool_id=self.sample_data.pool1_id,
  424             address='192.0.2.18', monitor_address='192.0.2.28',
  425             protocol_port=80, name='member3')
  426         prov_members = [prov_mem_update, prov_new_member]
  427 
  428         update_mem_dict = {'ip_address': '192.0.2.17',
  429                            'name': 'updated-member2',
  430                            'monitor_address': '192.0.2.77',
  431                            'id': self.sample_data.member2_id,
  432                            'enabled': False,
  433                            'protocol_port': 80,
  434                            'pool_id': self.sample_data.pool1_id}
  435 
  436         self.amp_driver.member_batch_update(
  437             self.sample_data.pool1_id, prov_members)
  438 
  439         payload = {'old_member_ids': [self.sample_data.member1_id],
  440                    'new_member_ids': [self.sample_data.member3_id],
  441                    'updated_members': [update_mem_dict]}
  442         mock_cast.assert_called_with({}, 'batch_update_members', **payload)
  443 
  444     @mock.patch('octavia.db.api.get_session')
  445     @mock.patch('octavia.db.repositories.PoolRepository.get')
  446     @mock.patch('oslo_messaging.RPCClient.cast')
  447     def test_member_batch_update_no_admin_addr(self, mock_cast,
  448                                                mock_pool_get, mock_session):
  449         mock_pool = mock.MagicMock()
  450         mock_pool.members = self.sample_data.db_pool1_members
  451         mock_pool_get.return_value = mock_pool
  452 
  453         prov_mem_update = driver_dm.Member(
  454             member_id=self.sample_data.member2_id,
  455             pool_id=self.sample_data.pool1_id,
  456             monitor_address='192.0.2.77',
  457             protocol_port=80, name='updated-member2')
  458         prov_new_member = driver_dm.Member(
  459             member_id=self.sample_data.member3_id,
  460             pool_id=self.sample_data.pool1_id,
  461             address='192.0.2.18', monitor_address='192.0.2.28',
  462             protocol_port=80, name='member3')
  463         prov_members = [prov_mem_update, prov_new_member]
  464 
  465         update_mem_dict = {'name': 'updated-member2',
  466                            'monitor_address': '192.0.2.77',
  467                            'id': self.sample_data.member2_id,
  468                            'protocol_port': 80,
  469                            'pool_id': self.sample_data.pool1_id}
  470 
  471         self.amp_driver.member_batch_update(
  472             self.sample_data.pool1_id, prov_members)
  473 
  474         payload = {'old_member_ids': [self.sample_data.member1_id],
  475                    'new_member_ids': [self.sample_data.member3_id],
  476                    'updated_members': [update_mem_dict]}
  477         mock_cast.assert_called_with({}, 'batch_update_members', **payload)
  478 
  479     @mock.patch('octavia.db.api.get_session')
  480     @mock.patch('octavia.db.repositories.PoolRepository.get')
  481     @mock.patch('oslo_messaging.RPCClient.cast')
  482     def test_member_batch_update_clear_already_empty(
  483             self, mock_cast, mock_pool_get, mock_session):
  484         """Expect that we will pass an empty payload if directed.
  485 
  486         Logic for whether or not to attempt this will be done above the driver
  487         layer, so our driver is responsible to forward the request even if it
  488         is a perceived no-op.
  489         """
  490         mock_pool = mock.MagicMock()
  491         mock_pool_get.return_value = mock_pool
  492 
  493         self.amp_driver.member_batch_update(
  494             self.sample_data.pool1_id, [])
  495 
  496         payload = {'old_member_ids': [],
  497                    'new_member_ids': [],
  498                    'updated_members': []}
  499         mock_cast.assert_called_with({}, 'batch_update_members', **payload)
  500 
  501     # Health Monitor
  502     @mock.patch('oslo_messaging.RPCClient.cast')
  503     def test_health_monitor_create(self, mock_cast):
  504         provider_HM = driver_dm.HealthMonitor(
  505             healthmonitor_id=self.sample_data.hm1_id)
  506         self.amp_driver.health_monitor_create(provider_HM)
  507         payload = {consts.HEALTH_MONITOR_ID: self.sample_data.hm1_id}
  508         mock_cast.assert_called_with({}, 'create_health_monitor', **payload)
  509 
  510     @mock.patch('oslo_messaging.RPCClient.cast')
  511     def test_health_monitor_delete(self, mock_cast):
  512         provider_HM = driver_dm.HealthMonitor(
  513             healthmonitor_id=self.sample_data.hm1_id)
  514         self.amp_driver.health_monitor_delete(provider_HM)
  515         payload = {consts.HEALTH_MONITOR_ID: self.sample_data.hm1_id}
  516         mock_cast.assert_called_with({}, 'delete_health_monitor', **payload)
  517 
  518     @mock.patch('octavia.db.api.get_session')
  519     @mock.patch('octavia.db.repositories.PoolRepository.get')
  520     @mock.patch('oslo_messaging.RPCClient.cast')
  521     def test_member_batch_update_udp_ipv4(self, mock_cast, mock_pool_get,
  522                                           mock_session):
  523 
  524         mock_lb = mock.MagicMock()
  525         mock_lb.vip = mock.MagicMock()
  526         mock_lb.vip.ip_address = "192.0.1.1"
  527         mock_listener = mock.MagicMock()
  528         mock_listener.load_balancer = mock_lb
  529         mock_pool = mock.MagicMock()
  530         mock_pool.protocol = consts.PROTOCOL_UDP
  531         mock_pool.listeners = [mock_listener]
  532         mock_pool.members = self.sample_data.db_pool1_members
  533         mock_pool_get.return_value = mock_pool
  534 
  535         prov_mem_update = driver_dm.Member(
  536             member_id=self.sample_data.member2_id,
  537             pool_id=self.sample_data.pool1_id, admin_state_up=False,
  538             address='192.0.2.17', monitor_address='192.0.2.77',
  539             protocol_port=80, name='updated-member2')
  540         prov_new_member = driver_dm.Member(
  541             member_id=self.sample_data.member3_id,
  542             pool_id=self.sample_data.pool1_id,
  543             address='192.0.2.18', monitor_address='192.0.2.28',
  544             protocol_port=80, name='member3')
  545         prov_members = [prov_mem_update, prov_new_member]
  546 
  547         update_mem_dict = {'ip_address': '192.0.2.17',
  548                            'name': 'updated-member2',
  549                            'monitor_address': '192.0.2.77',
  550                            'id': self.sample_data.member2_id,
  551                            'enabled': False,
  552                            'protocol_port': 80,
  553                            'pool_id': self.sample_data.pool1_id}
  554 
  555         self.amp_driver.member_batch_update(
  556             self.sample_data.pool1_id, prov_members)
  557 
  558         payload = {'old_member_ids': [self.sample_data.member1_id],
  559                    'new_member_ids': [self.sample_data.member3_id],
  560                    'updated_members': [update_mem_dict]}
  561         mock_cast.assert_called_with({}, 'batch_update_members', **payload)
  562 
  563     @mock.patch('octavia.db.api.get_session')
  564     @mock.patch('octavia.db.repositories.PoolRepository.get')
  565     @mock.patch('oslo_messaging.RPCClient.cast')
  566     def test_member_batch_update_udp_ipv4_ipv6(self, mock_cast, mock_pool_get,
  567                                                mock_session):
  568 
  569         mock_lb = mock.MagicMock()
  570         mock_lb.vip = mock.MagicMock()
  571         mock_lb.vip.ip_address = "192.0.1.1"
  572         mock_listener = mock.MagicMock()
  573         mock_listener.load_balancer = mock_lb
  574         mock_pool = mock.MagicMock()
  575         mock_pool.protocol = consts.PROTOCOL_UDP
  576         mock_pool.listeners = [mock_listener]
  577         mock_pool.members = self.sample_data.db_pool1_members
  578         mock_pool_get.return_value = mock_pool
  579 
  580         prov_mem_update = driver_dm.Member(
  581             member_id=self.sample_data.member2_id,
  582             pool_id=self.sample_data.pool1_id, admin_state_up=False,
  583             address='fe80::1', monitor_address='fe80::2',
  584             protocol_port=80, name='updated-member2')
  585         prov_new_member = driver_dm.Member(
  586             member_id=self.sample_data.member3_id,
  587             pool_id=self.sample_data.pool1_id,
  588             address='192.0.2.18', monitor_address='192.0.2.28',
  589             protocol_port=80, name='member3')
  590         prov_members = [prov_mem_update, prov_new_member]
  591 
  592         self.assertRaises(exceptions.UnsupportedOptionError,
  593                           self.amp_driver.member_batch_update,
  594                           self.sample_data.pool1_id, prov_members)
  595 
  596     @mock.patch('oslo_messaging.RPCClient.cast')
  597     def test_health_monitor_update(self, mock_cast):
  598         old_provider_hm = driver_dm.HealthMonitor(
  599             healthmonitor_id=self.sample_data.hm1_id)
  600         provider_hm = driver_dm.HealthMonitor(
  601             healthmonitor_id=self.sample_data.hm1_id, admin_state_up=True,
  602             max_retries=1, max_retries_down=2)
  603         hm_dict = {'enabled': True, 'rise_threshold': 1, 'fall_threshold': 2}
  604         self.amp_driver.health_monitor_update(old_provider_hm, provider_hm)
  605         payload = {consts.HEALTH_MONITOR_ID: self.sample_data.hm1_id,
  606                    consts.HEALTH_MONITOR_UPDATES: hm_dict}
  607         mock_cast.assert_called_with({}, 'update_health_monitor', **payload)
  608 
  609     @mock.patch('oslo_messaging.RPCClient.cast')
  610     def test_health_monitor_update_name(self, mock_cast):
  611         old_provider_hm = driver_dm.HealthMonitor(
  612             healthmonitor_id=self.sample_data.hm1_id)
  613         provider_hm = driver_dm.HealthMonitor(
  614             healthmonitor_id=self.sample_data.hm1_id, name='Great HM')
  615         hm_dict = {'name': 'Great HM'}
  616         self.amp_driver.health_monitor_update(old_provider_hm, provider_hm)
  617         payload = {consts.HEALTH_MONITOR_ID: self.sample_data.hm1_id,
  618                    consts.HEALTH_MONITOR_UPDATES: hm_dict}
  619         mock_cast.assert_called_with({}, 'update_health_monitor', **payload)
  620 
  621     # L7 Policy
  622     @mock.patch('oslo_messaging.RPCClient.cast')
  623     def test_l7policy_create(self, mock_cast):
  624         provider_l7policy = driver_dm.L7Policy(
  625             l7policy_id=self.sample_data.l7policy1_id)
  626         self.amp_driver.l7policy_create(provider_l7policy)
  627         payload = {consts.L7POLICY_ID: self.sample_data.l7policy1_id}
  628         mock_cast.assert_called_with({}, 'create_l7policy', **payload)
  629 
  630     @mock.patch('oslo_messaging.RPCClient.cast')
  631     def test_l7policy_delete(self, mock_cast):
  632         provider_l7policy = driver_dm.L7Policy(
  633             l7policy_id=self.sample_data.l7policy1_id)
  634         self.amp_driver.l7policy_delete(provider_l7policy)
  635         payload = {consts.L7POLICY_ID: self.sample_data.l7policy1_id}
  636         mock_cast.assert_called_with({}, 'delete_l7policy', **payload)
  637 
  638     @mock.patch('oslo_messaging.RPCClient.cast')
  639     def test_l7policy_update(self, mock_cast):
  640         old_provider_l7policy = driver_dm.L7Policy(
  641             l7policy_id=self.sample_data.l7policy1_id)
  642         provider_l7policy = driver_dm.L7Policy(
  643             l7policy_id=self.sample_data.l7policy1_id, admin_state_up=True)
  644         l7policy_dict = {'enabled': True}
  645         self.amp_driver.l7policy_update(old_provider_l7policy,
  646                                         provider_l7policy)
  647         payload = {consts.L7POLICY_ID: self.sample_data.l7policy1_id,
  648                    consts.L7POLICY_UPDATES: l7policy_dict}
  649         mock_cast.assert_called_with({}, 'update_l7policy', **payload)
  650 
  651     @mock.patch('oslo_messaging.RPCClient.cast')
  652     def test_l7policy_update_name(self, mock_cast):
  653         old_provider_l7policy = driver_dm.L7Policy(
  654             l7policy_id=self.sample_data.l7policy1_id)
  655         provider_l7policy = driver_dm.L7Policy(
  656             l7policy_id=self.sample_data.l7policy1_id, name='Great L7Policy')
  657         l7policy_dict = {'name': 'Great L7Policy'}
  658         self.amp_driver.l7policy_update(old_provider_l7policy,
  659                                         provider_l7policy)
  660         payload = {consts.L7POLICY_ID: self.sample_data.l7policy1_id,
  661                    consts.L7POLICY_UPDATES: l7policy_dict}
  662         mock_cast.assert_called_with({}, 'update_l7policy', **payload)
  663 
  664     # L7 Rules
  665     @mock.patch('oslo_messaging.RPCClient.cast')
  666     def test_l7rule_create(self, mock_cast):
  667         provider_l7rule = driver_dm.L7Rule(
  668             l7rule_id=self.sample_data.l7rule1_id)
  669         self.amp_driver.l7rule_create(provider_l7rule)
  670         payload = {consts.L7RULE_ID: self.sample_data.l7rule1_id}
  671         mock_cast.assert_called_with({}, 'create_l7rule', **payload)
  672 
  673     @mock.patch('oslo_messaging.RPCClient.cast')
  674     def test_l7rule_delete(self, mock_cast):
  675         provider_l7rule = driver_dm.L7Rule(
  676             l7rule_id=self.sample_data.l7rule1_id)
  677         self.amp_driver.l7rule_delete(provider_l7rule)
  678         payload = {consts.L7RULE_ID: self.sample_data.l7rule1_id}
  679         mock_cast.assert_called_with({}, 'delete_l7rule', **payload)
  680 
  681     @mock.patch('oslo_messaging.RPCClient.cast')
  682     def test_l7rule_update(self, mock_cast):
  683         old_provider_l7rule = driver_dm.L7Rule(
  684             l7rule_id=self.sample_data.l7rule1_id)
  685         provider_l7rule = driver_dm.L7Rule(
  686             l7rule_id=self.sample_data.l7rule1_id, admin_state_up=True)
  687         l7rule_dict = {'enabled': True}
  688         self.amp_driver.l7rule_update(old_provider_l7rule, provider_l7rule)
  689         payload = {consts.L7RULE_ID: self.sample_data.l7rule1_id,
  690                    consts.L7RULE_UPDATES: l7rule_dict}
  691         mock_cast.assert_called_with({}, 'update_l7rule', **payload)
  692 
  693     @mock.patch('oslo_messaging.RPCClient.cast')
  694     def test_l7rule_update_invert(self, mock_cast):
  695         old_provider_l7rule = driver_dm.L7Rule(
  696             l7rule_id=self.sample_data.l7rule1_id)
  697         provider_l7rule = driver_dm.L7Rule(
  698             l7rule_id=self.sample_data.l7rule1_id, invert=True)
  699         l7rule_dict = {'invert': True}
  700         self.amp_driver.l7rule_update(old_provider_l7rule, provider_l7rule)
  701         payload = {consts.L7RULE_ID: self.sample_data.l7rule1_id,
  702                    consts.L7RULE_UPDATES: l7rule_dict}
  703         mock_cast.assert_called_with({}, 'update_l7rule', **payload)
  704 
  705     # Flavor
  706     def test_get_supported_flavor_metadata(self):
  707         test_schema = {
  708             "properties": {
  709                 "test_name": {"description": "Test description"},
  710                 "test_name2": {"description": "Another description"}}}
  711         ref_dict = {"test_name": "Test description",
  712                     "test_name2": "Another description"}
  713 
  714         # mock out the supported_flavor_metadata
  715         with mock.patch('octavia.api.drivers.amphora_driver.flavor_schema.'
  716                         'SUPPORTED_FLAVOR_SCHEMA', test_schema):
  717             result = self.amp_driver.get_supported_flavor_metadata()
  718         self.assertEqual(ref_dict, result)
  719 
  720         # Test for bad schema
  721         with mock.patch('octavia.api.drivers.amphora_driver.flavor_schema.'
  722                         'SUPPORTED_FLAVOR_SCHEMA', 'bogus'):
  723             self.assertRaises(exceptions.DriverError,
  724                               self.amp_driver.get_supported_flavor_metadata)
  725 
  726     def test_validate_flavor(self):
  727         ref_dict = {consts.LOADBALANCER_TOPOLOGY: consts.TOPOLOGY_SINGLE}
  728         self.amp_driver.validate_flavor(ref_dict)
  729 
  730         # Test bad flavor metadata value is bad
  731         ref_dict = {consts.LOADBALANCER_TOPOLOGY: 'bogus'}
  732         self.assertRaises(exceptions.UnsupportedOptionError,
  733                           self.amp_driver.validate_flavor,
  734                           ref_dict)
  735 
  736         # Test bad flavor metadata key
  737         ref_dict = {'bogus': 'bogus'}
  738         self.assertRaises(exceptions.UnsupportedOptionError,
  739                           self.amp_driver.validate_flavor,
  740                           ref_dict)
  741 
  742         # Test for bad schema
  743         with mock.patch('octavia.api.drivers.amphora_driver.flavor_schema.'
  744                         'SUPPORTED_FLAVOR_SCHEMA', 'bogus'):
  745             self.assertRaises(exceptions.DriverError,
  746                               self.amp_driver.validate_flavor, 'bogus')
  747 
  748     # Availability Zone
  749     def test_get_supported_availability_zone_metadata(self):
  750         test_schema = {
  751             "properties": {
  752                 "test_name": {"description": "Test description"},
  753                 "test_name2": {"description": "Another description"}}}
  754         ref_dict = {"test_name": "Test description",
  755                     "test_name2": "Another description"}
  756 
  757         # mock out the supported_availability_zone_metadata
  758         with mock.patch('octavia.api.drivers.amphora_driver.'
  759                         'availability_zone_schema.'
  760                         'SUPPORTED_AVAILABILITY_ZONE_SCHEMA', test_schema):
  761             result = self.amp_driver.get_supported_availability_zone_metadata()
  762         self.assertEqual(ref_dict, result)
  763 
  764         # Test for bad schema
  765         with mock.patch('octavia.api.drivers.amphora_driver.'
  766                         'availability_zone_schema.'
  767                         'SUPPORTED_AVAILABILITY_ZONE_SCHEMA', 'bogus'):
  768             self.assertRaises(
  769                 exceptions.DriverError,
  770                 self.amp_driver.get_supported_availability_zone_metadata)
  771 
  772     def test_validate_availability_zone(self):
  773         with mock.patch('stevedore.driver.DriverManager.driver') as m_driver:
  774             m_driver.validate_availability_zone.return_value = None
  775             ref_dict = {consts.COMPUTE_ZONE: 'my_compute_zone'}
  776             self.amp_driver.validate_availability_zone(ref_dict)
  777 
  778         # Test bad availability zone metadata key
  779         ref_dict = {'bogus': 'bogus'}
  780         self.assertRaises(exceptions.UnsupportedOptionError,
  781                           self.amp_driver.validate_availability_zone,
  782                           ref_dict)
  783 
  784         # Test for bad schema
  785         with mock.patch('octavia.api.drivers.amphora_driver.'
  786                         'availability_zone_schema.'
  787                         'SUPPORTED_AVAILABILITY_ZONE_SCHEMA', 'bogus'):
  788             self.assertRaises(exceptions.DriverError,
  789                               self.amp_driver.validate_availability_zone,
  790                               'bogus')