"Fossies" - the Fresh Open Source Software Archive

Member "horizon-14.0.4/openstack_dashboard/dashboards/admin/networks/tests.py" (22 Oct 2019, 46713 Bytes) of package /linux/misc/openstack/horizon-14.0.4.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "tests.py": 15.1.0_vs_16.0.0.

    1 # Copyright 2012 NEC Corporation
    2 #
    3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    4 #    not use this file except in compliance with the License. You may obtain
    5 #    a copy of the License at
    6 #
    7 #         http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 #    Unless required by applicable law or agreed to in writing, software
   10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   12 #    License for the specific language governing permissions and limitations
   13 #    under the License.
   14 
   15 import collections
   16 
   17 from django.urls import reverse
   18 from django.utils.http import urlunquote
   19 
   20 import mock
   21 
   22 from horizon import forms
   23 
   24 from openstack_dashboard import api
   25 from openstack_dashboard.dashboards.project.networks import tests
   26 from openstack_dashboard.test import helpers as test
   27 from openstack_dashboard import usage
   28 
   29 INDEX_TEMPLATE = 'horizon/common/_data_table_view.html'
   30 INDEX_URL = reverse('horizon:admin:networks:index')
   31 
   32 
   33 class NetworkTests(test.BaseAdminViewTests):
   34 
   35     def _stub_is_extension_supported(self, features):
   36         self._features = features
   37         self._feature_call_counts = collections.defaultdict(int)
   38 
   39         def fake_extension_supported(request, alias):
   40             self._feature_call_counts[alias] += 1
   41             return self._features[alias]
   42 
   43         self.mock_is_extension_supported.side_effect = fake_extension_supported
   44 
   45     def _check_is_extension_supported(self, expected_count):
   46         self.assertEqual(expected_count, self._feature_call_counts)
   47 
   48     @test.create_mocks({api.neutron: ('network_list',
   49                                       'list_dhcp_agent_hosting_networks',
   50                                       'is_extension_supported'),
   51                         api.keystone: ('tenant_list',),
   52                         usage.quotas: ('tenant_quota_usages',)})
   53     def test_index(self):
   54         tenants = self.tenants.list()
   55         quota_data = self.quota_usages.first()
   56 
   57         self.mock_network_list.return_value = self.networks.list()
   58         self.mock_tenant_list.return_value = [tenants, False]
   59         self._stub_is_extension_supported(
   60             {'network_availability_zone': True,
   61              'dhcp_agent_scheduler': True})
   62         self.mock_tenant_quota_usages.return_value = quota_data
   63         self.mock_list_dhcp_agent_hosting_networks.return_value = \
   64             self.agents.list()
   65 
   66         res = self.client.get(INDEX_URL)
   67 
   68         self.assertTemplateUsed(res, INDEX_TEMPLATE)
   69         networks = res.context['networks_table'].data
   70         self.assertItemsEqual(networks, self.networks.list())
   71 
   72         self.mock_network_list.assert_called_once_with(test.IsHttpRequest())
   73         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
   74         self._check_is_extension_supported(
   75             {'network_availability_zone': 1,
   76              'dhcp_agent_scheduler': len(self.networks.list()) + 1})
   77         self.mock_tenant_quota_usages.assert_has_calls(
   78             [mock.call(test.IsHttpRequest(), tenant_id=network.tenant_id,
   79                        targets=('subnet', ))
   80              for network in self.networks.list()])
   81         self.assertEqual(len(self.networks.list()),
   82                          self.mock_tenant_quota_usages.call_count)
   83         self.mock_list_dhcp_agent_hosting_networks.assert_has_calls(
   84             [mock.call(test.IsHttpRequest(), network.id)
   85              for network in self.networks.list()])
   86         self.assertEqual(len(self.networks.list()),
   87                          self.mock_list_dhcp_agent_hosting_networks.call_count)
   88 
   89     @test.create_mocks({api.neutron: ('network_list',
   90                                       'is_extension_supported',)})
   91     def test_index_network_list_exception(self):
   92         self.mock_network_list.side_effect = self.exceptions.neutron
   93         self._stub_is_extension_supported(
   94             {'network_availability_zone': True,
   95              'dhcp_agent_scheduler': True})
   96 
   97         res = self.client.get(INDEX_URL)
   98 
   99         self.assertTemplateUsed(res, INDEX_TEMPLATE)
  100         self.assertEqual(len(res.context['networks_table'].data), 0)
  101         self.assertMessageCount(res, error=1)
  102 
  103         self.mock_network_list.assert_called_once_with(test.IsHttpRequest())
  104         self._check_is_extension_supported(
  105             {'network_availability_zone': 1,
  106              'dhcp_agent_scheduler': 1})
  107 
  108     @test.create_mocks({api.neutron: ('network_get',
  109                                       'is_extension_supported'),
  110                         usage.quotas: ('tenant_quota_usages',)})
  111     def test_network_detail_new(self, mac_learning=False):
  112         network = self.networks.first()
  113         quota_data = self.quota_usages.first()
  114 
  115         self.mock_network_get.return_value = network
  116         self.mock_tenant_quota_usages.return_value = quota_data
  117         self._stub_is_extension_supported(
  118             {'network-ip-availability': True,
  119              'network_availability_zone': True,
  120              'mac-learning': mac_learning,
  121              'dhcp_agent_scheduler': True})
  122 
  123         url = urlunquote(reverse('horizon:admin:networks:detail',
  124                                  args=[network.id]))
  125 
  126         res = self.client.get(url)
  127         network = res.context['network']
  128         self.assertEqual(self.networks.first().name_or_id, network.name_or_id)
  129         self.assertEqual(self.networks.first().status_label,
  130                          network.status_label)
  131         self.assertTemplateUsed(res, 'horizon/common/_detail.html')
  132 
  133         self.assert_mock_multiple_calls_with_same_arguments(
  134             self.mock_network_get, 2,
  135             mock.call(test.IsHttpRequest(), network.id))
  136         self.mock_tenant_quota_usages.assert_called_once_with(
  137             test.IsHttpRequest(), tenant_id=network.tenant_id,
  138             targets=('subnet',))
  139         self._check_is_extension_supported(
  140             {'network-ip-availability': 1,
  141              'network_availability_zone': 1,
  142              'mac-learning': 1,
  143              'dhcp_agent_scheduler': 1})
  144 
  145     def test_network_detail_subnets_tab(self):
  146         self._test_network_detail_subnets_tab()
  147 
  148     def test_network_detail_subnets_tab_with_mac_learning(self):
  149         self._test_network_detail_subnets_tab(mac_learning=True)
  150 
  151     @test.create_mocks({api.neutron: ('network_get',
  152                                       'subnet_list',
  153                                       'show_network_ip_availability',
  154                                       'is_extension_supported'),
  155                         usage.quotas: ('tenant_quota_usages',)})
  156     def _test_network_detail_subnets_tab(self, mac_learning=False):
  157         network = self.networks.first()
  158         ip_availability = self.ip_availability.get()
  159         quota_data = self.quota_usages.first()
  160 
  161         self.mock_show_network_ip_availability.return_value = ip_availability
  162         self.mock_network_get.return_value = network
  163         self.mock_subnet_list.return_value = [self.subnets.first()]
  164         self._stub_is_extension_supported(
  165             {'network-ip-availability': True,
  166              'mac-learning': mac_learning,
  167              'network_availability_zone': True,
  168              'dhcp_agent_scheduler': True})
  169         self.mock_tenant_quota_usages.return_value = quota_data
  170 
  171         url = urlunquote(reverse('horizon:admin:networks:subnets_tab',
  172                          args=[network.id]))
  173         res = self.client.get(url)
  174 
  175         self.assertTemplateUsed(res, 'horizon/common/_detail.html')
  176         subnets = res.context['subnets_table'].data
  177         self.assertItemsEqual(subnets, [self.subnets.first()])
  178 
  179         self.mock_show_network_ip_availability.assert_called_once_with(
  180             test.IsHttpRequest(), network.id)
  181         self.assert_mock_multiple_calls_with_same_arguments(
  182             self.mock_network_get, 2,
  183             mock.call(test.IsHttpRequest(), network.id))
  184         self.mock_subnet_list.assert_called_once_with(test.IsHttpRequest(),
  185                                                       network_id=network.id)
  186         self._check_is_extension_supported(
  187             {'network-ip-availability': 2,
  188              'mac-learning': 1,
  189              'network_availability_zone': 1,
  190              'dhcp_agent_scheduler': 1})
  191         self.assert_mock_multiple_calls_with_same_arguments(
  192             self.mock_tenant_quota_usages, 3,
  193             mock.call(test.IsHttpRequest(), tenant_id=network.tenant_id,
  194                       targets=('subnet',)))
  195 
  196     @test.create_mocks({api.neutron: ('network_get',
  197                                       'port_list',
  198                                       'is_extension_supported'),
  199                         usage.quotas: ('tenant_quota_usages',)})
  200     def test_network_detail_ports_tab(self, mac_learning=False):
  201         network = self.networks.first()
  202         quota_data = self.neutron_quota_usages.first()
  203 
  204         self.mock_network_get.return_value = network
  205         self.mock_port_list.return_value = [self.ports.first()]
  206         self.mock_tenant_quota_usages.return_value = quota_data
  207         self._stub_is_extension_supported(
  208             {'network-ip-availability': True,
  209              'mac-learning': mac_learning,
  210              'network_availability_zone': True,
  211              'dhcp_agent_scheduler': True})
  212 
  213         url = reverse('horizon:admin:networks:ports_tab',
  214                       args=[network.id])
  215         res = self.client.get(urlunquote(url))
  216 
  217         self.assertTemplateUsed(res, 'horizon/common/_detail.html')
  218         ports = res.context['ports_table'].data
  219         self.assertItemsEqual(ports, [self.ports.first()])
  220 
  221         self.assert_mock_multiple_calls_with_same_arguments(
  222             self.mock_network_get, 2,
  223             mock.call(test.IsHttpRequest(), network.id))
  224         self.mock_port_list.assert_called_once_with(test.IsHttpRequest(),
  225                                                     network_id=network.id)
  226         self.assertEqual(3, self.mock_tenant_quota_usages.call_count)
  227         self.mock_tenant_quota_usages.assert_has_calls([
  228             mock.call(test.IsHttpRequest(), tenant_id=network.tenant_id,
  229                       targets=('subnet',)),
  230             mock.call(test.IsHttpRequest(), tenant_id=network.tenant_id,
  231                       targets=('port',)),
  232             mock.call(test.IsHttpRequest(), tenant_id=network.tenant_id,
  233                       targets=('port',)),
  234         ])
  235         self._check_is_extension_supported(
  236             {'network-ip-availability': 1,
  237              'mac-learning': 1,
  238              'network_availability_zone': 1,
  239              'dhcp_agent_scheduler': 1})
  240 
  241     @test.create_mocks({api.neutron: ('network_get',
  242                                       'is_extension_supported',
  243                                       'list_dhcp_agent_hosting_networks',),
  244                         usage.quotas: ('tenant_quota_usages',)})
  245     def test_network_detail_agents_tab(self, mac_learning=False):
  246         network = self.networks.first()
  247         quota_data = self.quota_usages.first()
  248 
  249         self._stub_is_extension_supported(
  250             {'network-ip-availability': True,
  251              'mac-learning': mac_learning,
  252              'network_availability_zone': True,
  253              'dhcp_agent_scheduler': True})
  254         self.mock_list_dhcp_agent_hosting_networks.return_value = \
  255             self.agents.list()
  256         self.mock_network_get.return_value = network
  257         self.mock_tenant_quota_usages.return_value = quota_data
  258 
  259         url = reverse('horizon:admin:networks:agents_tab', args=[network.id])
  260         res = self.client.get(urlunquote(url))
  261 
  262         self.assertTemplateUsed(res, 'horizon/common/_detail.html')
  263         result_agents = res.context['agents_table'].data
  264         expected_agents = self.agents.list()
  265 
  266         self.assertItemsEqual(result_agents, expected_agents)
  267 
  268         self._check_is_extension_supported(
  269             {'network-ip-availability': 1,
  270              'mac-learning': 1,
  271              'network_availability_zone': 1,
  272              'dhcp_agent_scheduler': 2})
  273         self.mock_list_dhcp_agent_hosting_networks.assert_called_once_with(
  274             test.IsHttpRequest(), network.id)
  275         self.mock_network_get.assert_called_once_with(
  276             test.IsHttpRequest(), network.id)
  277         self.mock_tenant_quota_usages.assert_called_once_with(
  278             test.IsHttpRequest(), tenant_id=network.tenant_id,
  279             targets=('subnet',))
  280 
  281     def test_network_detail_subnets_tab_network_exception(self):
  282         self._test_network_detail_subnets_tab_network_exception()
  283 
  284     def test_network_detail_network_exception_with_mac_learning(self):
  285         self._test_network_detail_subnets_tab_network_exception(
  286             mac_learning=True)
  287 
  288     @test.create_mocks({api.neutron: ('network_get',
  289                                       'subnet_list',
  290                                       'is_extension_supported',
  291                                       'show_network_ip_availability')})
  292     def _test_network_detail_subnets_tab_network_exception(self,
  293                                                            mac_learning=False):
  294         network_id = self.networks.first().id
  295         ip_availability = self.ip_availability.get()
  296 
  297         self.mock_show_network_ip_availability.return_value = ip_availability
  298         self.mock_network_get.side_effect = self.exceptions.neutron
  299         self.mock_subnet_list.return_value = [self.subnets.first()]
  300         self._stub_is_extension_supported(
  301             {'network-ip-availability': True,
  302              'mac-learning': mac_learning})
  303 
  304         url = urlunquote(reverse('horizon:admin:networks:subnets_tab',
  305                                  args=[network_id]))
  306         res = self.client.get(url)
  307 
  308         redir_url = INDEX_URL
  309         self.assertRedirectsNoFollow(res, redir_url)
  310 
  311         self.mock_show_network_ip_availability.assert_called_once_with(
  312             test.IsHttpRequest(), network_id)
  313         self.mock_network_get.assert_called_once_with(test.IsHttpRequest(),
  314                                                       network_id)
  315         self.mock_subnet_list.assert_called_once_with(test.IsHttpRequest(),
  316                                                       network_id=network_id)
  317         self._check_is_extension_supported(
  318             {'network-ip-availability': 2,
  319              'mac-learning': 1})
  320 
  321     def test_network_detail_subnets_tab_subnet_exception(self):
  322         self._test_network_detail_subnets_tab_subnet_exception()
  323 
  324     def test_network_detail_subnets_tab_subnet_exception_w_mac_learning(self):
  325         self._test_network_detail_subnets_tab_subnet_exception(
  326             mac_learning=True)
  327 
  328     @test.create_mocks({api.neutron: ('network_get',
  329                                       'subnet_list',
  330                                       'show_network_ip_availability',
  331                                       'is_extension_supported'),
  332                         usage.quotas: ('tenant_quota_usages',)})
  333     def _test_network_detail_subnets_tab_subnet_exception(self,
  334                                                           mac_learning=False):
  335         network = self.networks.first()
  336         quota_data = self.quota_usages.first()
  337 
  338         self.mock_show_network_ip_availability.return_value = \
  339             self.ip_availability.get()
  340         self.mock_network_get.return_value = network
  341         self.mock_subnet_list.side_effect = self.exceptions.neutron
  342         self._stub_is_extension_supported(
  343             {'network-ip-availability': True,
  344              'mac-learning': mac_learning,
  345              'dhcp_agent_scheduler': True,
  346              'network_availability_zone': True})
  347         self.mock_tenant_quota_usages.return_value = quota_data
  348 
  349         url = urlunquote(reverse('horizon:admin:networks:subnets_tab',
  350                          args=[network.id]))
  351         res = self.client.get(url)
  352 
  353         self.assertTemplateUsed(res, 'horizon/common/_detail.html')
  354         subnets = res.context['subnets_table'].data
  355         self.assertEqual(len(subnets), 0)
  356 
  357         self.mock_show_network_ip_availability.assert_called_once_with(
  358             test.IsHttpRequest(), network.id)
  359         self.assert_mock_multiple_calls_with_same_arguments(
  360             self.mock_network_get, 2,
  361             mock.call(test.IsHttpRequest(), network.id))
  362         self.mock_subnet_list.assert_called_once_with(test.IsHttpRequest(),
  363                                                       network_id=network.id)
  364         self.assert_mock_multiple_calls_with_same_arguments(
  365             self.mock_tenant_quota_usages, 3,
  366             mock.call(test.IsHttpRequest(), tenant_id=network.tenant_id,
  367                       targets=('subnet',)))
  368         self._stub_is_extension_supported(
  369             {'network-ip-availability': 1,
  370              'mac-learning': 1,
  371              'dhcp_agent_scheduler': 2,
  372              'network_availability_zone': 1})
  373 
  374     def test_network_detail_port_exception(self):
  375         self._test_network_detail_subnets_tab_port_exception()
  376 
  377     def test_network_detail_subnets_tab_port_exception_with_mac_learning(self):
  378         self._test_network_detail_subnets_tab_port_exception(mac_learning=True)
  379 
  380     @test.create_mocks({api.neutron: ('network_get',
  381                                       'subnet_list',
  382                                       'is_extension_supported',
  383                                       'show_network_ip_availability'),
  384                         usage.quotas: ('tenant_quota_usages',)})
  385     def _test_network_detail_subnets_tab_port_exception(self,
  386                                                         mac_learning=False):
  387         network = self.networks.first()
  388         ip_availability = self.ip_availability.get()
  389         quota_data = self.quota_usages.first()
  390 
  391         self.mock_show_network_ip_availability.return_value = ip_availability
  392         self.mock_network_get.return_value = network
  393         self.mock_subnet_list.return_value = [self.subnets.first()]
  394         self._stub_is_extension_supported(
  395             {'network-ip-availability': True,
  396              'mac-learning': mac_learning,
  397              'network_availability_zone': True,
  398              'dhcp_agent_scheduler': True})
  399         self.mock_tenant_quota_usages.return_value = quota_data
  400 
  401         url = urlunquote(reverse('horizon:admin:networks:subnets_tab',
  402                          args=[network.id]))
  403         res = self.client.get(url)
  404 
  405         self.assertTemplateUsed(res, 'horizon/common/_detail.html')
  406         subnets = res.context['subnets_table'].data
  407         self.assertItemsEqual(subnets, [self.subnets.first()])
  408 
  409         self.mock_show_network_ip_availability.assert_called_once_with(
  410             test.IsHttpRequest(), network.id)
  411         self.assert_mock_multiple_calls_with_same_arguments(
  412             self.mock_network_get, 2,
  413             mock.call(test.IsHttpRequest(), network.id))
  414         self.mock_subnet_list.assert_called_once_with(test.IsHttpRequest(),
  415                                                       network_id=network.id)
  416         self._check_is_extension_supported(
  417             {'network-ip-availability': 2,
  418              'mac-learning': 1,
  419              'network_availability_zone': 1,
  420              'dhcp_agent_scheduler': 1})
  421         self.assert_mock_multiple_calls_with_same_arguments(
  422             self.mock_tenant_quota_usages, 3,
  423             mock.call(test.IsHttpRequest(), tenant_id=network.tenant_id,
  424                       targets=('subnet',)))
  425 
  426     @test.create_mocks({api.neutron: ('is_extension_supported',),
  427                         api.keystone: ('tenant_list',)})
  428     def test_network_create_get(self):
  429         tenants = self.tenants.list()
  430         self.mock_tenant_list.return_value = [tenants, False]
  431         self._stub_is_extension_supported(
  432             {'provider': True,
  433              'network_availability_zone': False,
  434              'subnet_allocation': False})
  435 
  436         url = reverse('horizon:admin:networks:create')
  437         res = self.client.get(url)
  438 
  439         self.assertTemplateUsed(res, 'horizon/common/_workflow_base.html')
  440 
  441         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  442         self._check_is_extension_supported(
  443             {'provider': 1,
  444              'network_availability_zone': 2,
  445              'subnet_allocation': 1})
  446 
  447     @test.create_mocks({api.neutron: ('network_create',
  448                                       'is_extension_supported',
  449                                       'subnetpool_list'),
  450                         api.keystone: ('tenant_list',)})
  451     def test_network_create_post(self):
  452         tenants = self.tenants.list()
  453         tenant_id = self.tenants.first().id
  454         network = self.networks.first()
  455 
  456         self.mock_tenant_list.return_value = [tenants, False]
  457         self._stub_is_extension_supported(
  458             {'provider': True,
  459              'network_availability_zone': False,
  460              'subnet_allocation': True})
  461         self.mock_subnetpool_list.return_value = self.subnetpools.list()
  462         self.mock_network_create.return_value = network
  463 
  464         form_data = {'tenant_id': tenant_id,
  465                      'name': network.name,
  466                      'admin_state': network.admin_state_up,
  467                      'external': True,
  468                      'shared': True,
  469                      'network_type': 'local'}
  470         url = reverse('horizon:admin:networks:create')
  471         res = self.client.post(url, form_data)
  472 
  473         self.assertNoFormErrors(res)
  474         self.assertRedirectsNoFollow(res, INDEX_URL)
  475 
  476         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  477         self.mock_subnetpool_list.assert_called_once_with(test.IsHttpRequest())
  478         params = {'name': network.name,
  479                   'tenant_id': tenant_id,
  480                   'admin_state_up': network.admin_state_up,
  481                   'router:external': True,
  482                   'shared': True,
  483                   'provider:network_type': 'local'}
  484         self.mock_network_create.assert_called_once_with(test.IsHttpRequest(),
  485                                                          **params)
  486         self._check_is_extension_supported(
  487             {'provider': 3,
  488              'network_availability_zone': 2,
  489              'subnet_allocation': 1})
  490 
  491     @test.create_mocks({api.neutron: ('network_create',
  492                                       'is_extension_supported',
  493                                       'list_availability_zones',
  494                                       'subnetpool_list'),
  495                         api.keystone: ('tenant_list',)})
  496     def test_network_create_post_with_az(self):
  497         tenants = self.tenants.list()
  498         tenant_id = self.tenants.first().id
  499         network = self.networks.first()
  500 
  501         self.mock_tenant_list.return_value = [tenants, False]
  502         self._stub_is_extension_supported(
  503             {'provider': True,
  504              'network_availability_zone': True,
  505              'subnet_allocation': True})
  506         self.mock_list_availability_zones.return_value = \
  507             self.neutron_availability_zones.list()
  508         self.mock_subnetpool_list.return_value = self.subnetpools.list()
  509         self.mock_network_create.return_value = network
  510 
  511         form_data = {'tenant_id': tenant_id,
  512                      'name': network.name,
  513                      'admin_state': network.admin_state_up,
  514                      'external': True,
  515                      'shared': True,
  516                      'network_type': 'local',
  517                      'az_hints': ['nova']}
  518         url = reverse('horizon:admin:networks:create')
  519         res = self.client.post(url, form_data)
  520 
  521         self.assertNoFormErrors(res)
  522         self.assertRedirectsNoFollow(res, INDEX_URL)
  523 
  524         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  525         self._stub_is_extension_supported(
  526             {'provider': 1,
  527              'network_availability_zone': 1,
  528              'subnet_allocation': 1})
  529         self.assert_mock_multiple_calls_with_same_arguments(
  530             self.mock_list_availability_zones, 2,
  531             mock.call(test.IsHttpRequest(), "network", "available"))
  532         self.mock_subnetpool_list.assert_called_once_with(test.IsHttpRequest())
  533         params = {'name': network.name,
  534                   'tenant_id': tenant_id,
  535                   'admin_state_up': network.admin_state_up,
  536                   'router:external': True,
  537                   'shared': True,
  538                   'provider:network_type': 'local',
  539                   'availability_zone_hints': ['nova']}
  540         self.mock_network_create.assert_called_once_with(test.IsHttpRequest(),
  541                                                          **params)
  542 
  543     @test.create_mocks({api.neutron: ('network_create',
  544                                       'subnet_create',
  545                                       'is_extension_supported',
  546                                       'subnetpool_list'),
  547                         api.keystone: ('tenant_list',)})
  548     def test_network_create_post_with_subnet(self):
  549         tenants = self.tenants.list()
  550         tenant_id = self.tenants.first().id
  551         network = self.networks.first()
  552         subnet = self.subnets.first()
  553 
  554         self._stub_is_extension_supported(
  555             {'provider': True,
  556              'network_availability_zone': False,
  557              'subnet_allocation': True})
  558         self.mock_tenant_list.return_value = [tenants, False]
  559         self.mock_subnetpool_list.return_value = self.subnetpools.list()
  560         self.mock_network_create.return_value = network
  561         self.mock_subnet_create.return_value = subnet
  562 
  563         form_data = {'tenant_id': tenant_id,
  564                      'name': network.name,
  565                      'admin_state': network.admin_state_up,
  566                      'external': True,
  567                      'shared': True,
  568                      'network_type': 'local',
  569                      'with_subnet': True}
  570         form_data.update(tests.form_data_subnet(subnet, allocation_pools=[]))
  571         url = reverse('horizon:admin:networks:create')
  572         res = self.client.post(url, form_data)
  573 
  574         self.assertNoFormErrors(res)
  575         self.assertRedirectsNoFollow(res, INDEX_URL)
  576 
  577         self._check_is_extension_supported(
  578             {'provider': 3,
  579              'network_availability_zone': 2,
  580              'subnet_allocation': 1})
  581         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  582         self.mock_subnetpool_list.assert_called_once_with(test.IsHttpRequest())
  583         params = {'name': network.name,
  584                   'tenant_id': tenant_id,
  585                   'admin_state_up': network.admin_state_up,
  586                   'router:external': True,
  587                   'shared': True,
  588                   'provider:network_type': 'local'}
  589         self.mock_network_create.assert_called_once_with(test.IsHttpRequest(),
  590                                                          **params)
  591         subnet_params = {'name': subnet.name,
  592                          'network_id': subnet.network_id,
  593                          'cidr': subnet.cidr,
  594                          'enable_dhcp': subnet.enable_dhcp,
  595                          'gateway_ip': subnet.gateway_ip,
  596                          'ip_version': subnet.ip_version}
  597         self.mock_subnet_create.assert_called_once_with(test.IsHttpRequest(),
  598                                                         **subnet_params)
  599 
  600     @test.create_mocks({api.neutron: ('network_create',
  601                                       'is_extension_supported',
  602                                       'subnetpool_list'),
  603                         api.keystone: ('tenant_list',)})
  604     def test_network_create_post_network_exception(self):
  605         tenants = self.tenants.list()
  606         tenant_id = self.tenants.first().id
  607         network = self.networks.first()
  608 
  609         self.mock_tenant_list.return_value = [tenants, False]
  610         self._stub_is_extension_supported(
  611             {'provider': True,
  612              'network_availability_zone': False,
  613              'subnet_allocation': True})
  614         self.mock_subnetpool_list.return_value = self.subnetpools.list()
  615         self.mock_network_create.side_effect = self.exceptions.neutron
  616 
  617         form_data = {'tenant_id': tenant_id,
  618                      'name': network.name,
  619                      'admin_state': network.admin_state_up,
  620                      'external': True,
  621                      'shared': False,
  622                      'network_type': 'local'}
  623         url = reverse('horizon:admin:networks:create')
  624         res = self.client.post(url, form_data)
  625 
  626         self.assertNoFormErrors(res)
  627         self.assertRedirectsNoFollow(res, INDEX_URL)
  628 
  629         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  630         self._check_is_extension_supported(
  631             {'provider': 3,
  632              'network_availability_zone': 2,
  633              'subnet_allocation': 1})
  634         self.mock_subnetpool_list.assert_called_once_with(test.IsHttpRequest())
  635         params = {'name': network.name,
  636                   'tenant_id': tenant_id,
  637                   'admin_state_up': network.admin_state_up,
  638                   'router:external': True,
  639                   'shared': False,
  640                   'provider:network_type': 'local'}
  641         self.mock_network_create.assert_called_once_with(test.IsHttpRequest(),
  642                                                          **params)
  643 
  644     @test.create_mocks({api.neutron: ('is_extension_supported',),
  645                         api.keystone: ('tenant_list',)})
  646     def test_network_create_vlan_segmentation_id_invalid(self):
  647         tenants = self.tenants.list()
  648         tenant_id = self.tenants.first().id
  649         network = self.networks.first()
  650 
  651         self.mock_tenant_list.return_value = [tenants, False]
  652         self._stub_is_extension_supported(
  653             {'network_availability_zone': False,
  654              'subnet_allocation': False,
  655              'provider': True})
  656 
  657         form_data = {'tenant_id': tenant_id,
  658                      'name': network.name,
  659                      'admin_state': network.admin_state_up,
  660                      'external': True,
  661                      'shared': False,
  662                      'network_type': 'vlan',
  663                      'physical_network': 'default',
  664                      'segmentation_id': 4095}
  665         url = reverse('horizon:admin:networks:create')
  666         res = self.client.post(url, form_data)
  667 
  668         self.assertFormErrors(res, 1)
  669         self.assertContains(res, "1 through 4094")
  670 
  671         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  672         self._check_is_extension_supported(
  673             {'network_availability_zone': 2,
  674              'subnet_allocation': 1,
  675              'provider': 2})
  676 
  677     @test.create_mocks({api.neutron: ('is_extension_supported',),
  678                         api.keystone: ('tenant_list',)})
  679     def test_network_create_gre_segmentation_id_invalid(self):
  680         tenants = self.tenants.list()
  681         tenant_id = self.tenants.first().id
  682         network = self.networks.first()
  683 
  684         self.mock_tenant_list.return_value = [tenants, False]
  685         self._stub_is_extension_supported(
  686             {'network_availability_zone': False,
  687              'subnet_allocation': False,
  688              'provider': True})
  689 
  690         form_data = {'tenant_id': tenant_id,
  691                      'name': network.name,
  692                      'admin_state': network.admin_state_up,
  693                      'external': True,
  694                      'shared': False,
  695                      'network_type': 'gre',
  696                      'physical_network': 'default',
  697                      'segmentation_id': (2 ** 32) + 1}
  698         url = reverse('horizon:admin:networks:create')
  699         res = self.client.post(url, form_data)
  700 
  701         self.assertFormErrors(res, 1)
  702         self.assertContains(res, "1 through %s" % ((2 ** 32) - 1))
  703 
  704         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  705         self._check_is_extension_supported(
  706             {'network_availability_zone': 2,
  707              'subnet_allocation': 1,
  708              'provider': 2})
  709 
  710     @test.create_mocks({api.neutron: ('is_extension_supported',),
  711                         api.keystone: ('tenant_list',)})
  712     @test.update_settings(
  713         OPENSTACK_NEUTRON_NETWORK={
  714             'segmentation_id_range': {'vxlan': [10, 20]}})
  715     def test_network_create_vxlan_segmentation_id_custom(self):
  716         tenants = self.tenants.list()
  717         tenant_id = self.tenants.first().id
  718         network = self.networks.first()
  719 
  720         self.mock_tenant_list.return_value = [tenants, False]
  721         self._stub_is_extension_supported(
  722             {'network_availability_zone': False,
  723              'subnet_allocation': False,
  724              'provider': True})
  725 
  726         form_data = {'tenant_id': tenant_id,
  727                      'name': network.name,
  728                      'admin_state': network.admin_state_up,
  729                      'external': True,
  730                      'shared': False,
  731                      'network_type': 'vxlan',
  732                      'physical_network': 'default',
  733                      'segmentation_id': 9}
  734         url = reverse('horizon:admin:networks:create')
  735         res = self.client.post(url, form_data)
  736 
  737         self.assertFormErrors(res, 1)
  738         self.assertContains(res, "10 through 20")
  739 
  740         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  741         self._check_is_extension_supported(
  742             {'network_availability_zone': 2,
  743              'subnet_allocation': 1,
  744              'provider': 2})
  745 
  746     @test.create_mocks({api.neutron: ('is_extension_supported',),
  747                         api.keystone: ('tenant_list',)})
  748     @test.update_settings(
  749         OPENSTACK_NEUTRON_NETWORK={
  750             'supported_provider_types': []})
  751     def test_network_create_no_provider_types(self):
  752         tenants = self.tenants.list()
  753 
  754         self.mock_tenant_list.return_value = [tenants, False]
  755         self._stub_is_extension_supported(
  756             {'network_availability_zone': False,
  757              'subnet_allocation': False,
  758              'provider': True})
  759 
  760         url = reverse('horizon:admin:networks:create')
  761         res = self.client.get(url)
  762 
  763         self.assertTemplateUsed(res, 'horizon/common/_workflow_base.html')
  764         self.assertContains(
  765             res,
  766             '<input type="hidden" name="network_type" id="id_network_type" />',
  767             html=True)
  768 
  769         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  770         self._check_is_extension_supported(
  771             {'network_availability_zone': 2,
  772              'subnet_allocation': 1,
  773              'provider': 1})
  774 
  775     @test.create_mocks({api.neutron: ('is_extension_supported',),
  776                         api.keystone: ('tenant_list',)})
  777     @test.update_settings(
  778         OPENSTACK_NEUTRON_NETWORK={
  779             'supported_provider_types': ['local', 'flat', 'gre']})
  780     def test_network_create_unsupported_provider_types(self):
  781         tenants = self.tenants.list()
  782 
  783         self.mock_tenant_list.return_value = [tenants, False]
  784         self._stub_is_extension_supported(
  785             {'network_availability_zone': False,
  786              'subnet_allocation': False,
  787              'provider': True})
  788 
  789         url = reverse('horizon:admin:networks:create')
  790         res = self.client.get(url)
  791 
  792         self.assertTemplateUsed(res, 'horizon/common/_workflow_base.html')
  793         network_type = res.context['form'].fields['network_type']
  794         self.assertListEqual(list(network_type.choices), [('local', 'Local'),
  795                                                           ('flat', 'Flat'),
  796                                                           ('gre', 'GRE')])
  797 
  798         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  799         self._check_is_extension_supported(
  800             {'network_availability_zone': 2,
  801              'subnet_allocation': 1,
  802              'provider': 1})
  803 
  804     @test.create_mocks({api.neutron: ('network_get',)})
  805     def test_network_update_get(self):
  806         network = self.networks.first()
  807         self.mock_network_get.return_value = network
  808 
  809         url = reverse('horizon:admin:networks:update', args=[network.id])
  810         res = self.client.get(url)
  811 
  812         self.assertTemplateUsed(res, 'admin/networks/update.html')
  813 
  814         self.mock_network_get.assert_called_once_with(test.IsHttpRequest(),
  815                                                       network.id,
  816                                                       expand_subnet=False)
  817 
  818     @test.create_mocks({api.neutron: ('network_get',)})
  819     def test_network_update_get_exception(self):
  820         network = self.networks.first()
  821         self.mock_network_get.side_effect = self.exceptions.neutron
  822 
  823         url = reverse('horizon:admin:networks:update', args=[network.id])
  824         res = self.client.get(url)
  825 
  826         redir_url = INDEX_URL
  827         self.assertRedirectsNoFollow(res, redir_url)
  828 
  829         self.mock_network_get.assert_called_once_with(test.IsHttpRequest(),
  830                                                       network.id,
  831                                                       expand_subnet=False)
  832 
  833     @test.create_mocks({api.neutron: ('network_update',
  834                                       'network_get',)})
  835     def test_network_update_post(self):
  836         network = self.networks.first()
  837 
  838         self.mock_network_update.return_value = network
  839         self.mock_network_get.return_value = network
  840 
  841         form_data = {'network_id': network.id,
  842                      'name': network.name,
  843                      'tenant_id': network.tenant_id,
  844                      'admin_state': network.admin_state_up,
  845                      'shared': True,
  846                      'external': True}
  847         url = reverse('horizon:admin:networks:update', args=[network.id])
  848         res = self.client.post(url, form_data)
  849 
  850         self.assertRedirectsNoFollow(res, INDEX_URL)
  851 
  852         params = {'name': network.name,
  853                   'shared': True,
  854                   'admin_state_up': network.admin_state_up,
  855                   'router:external': True}
  856         self.mock_network_update.assert_called_once_with(test.IsHttpRequest(),
  857                                                          network.id,
  858                                                          **params)
  859         self.mock_network_get.assert_called_once_with(test.IsHttpRequest(),
  860                                                       network.id,
  861                                                       expand_subnet=False)
  862 
  863     @test.create_mocks({api.neutron: ('network_update',
  864                                       'network_get',)})
  865     def test_network_update_post_exception(self):
  866         network = self.networks.first()
  867         params = {'name': network.name,
  868                   'shared': False,
  869                   'admin_state_up': network.admin_state_up,
  870                   'router:external': False}
  871         self.mock_network_update.side_effect = self.exceptions.neutron
  872         self.mock_network_get.return_value = network
  873 
  874         form_data = {'network_id': network.id,
  875                      'name': network.name,
  876                      'tenant_id': network.tenant_id,
  877                      'admin_state': network.admin_state_up,
  878                      'shared': False,
  879                      'external': False}
  880         url = reverse('horizon:admin:networks:update', args=[network.id])
  881         res = self.client.post(url, form_data)
  882 
  883         self.assertRedirectsNoFollow(res, INDEX_URL)
  884 
  885         self.mock_network_update.assert_called_once_with(test.IsHttpRequest(),
  886                                                          network.id,
  887                                                          **params)
  888         self.mock_network_get.assert_called_once_with(test.IsHttpRequest(),
  889                                                       network.id,
  890                                                       expand_subnet=False)
  891 
  892     @test.create_mocks({api.neutron: ('network_list',
  893                                       'network_delete',
  894                                       'list_dhcp_agent_hosting_networks',
  895                                       'is_extension_supported'),
  896                         api.keystone: ('tenant_list',)})
  897     def test_delete_network(self):
  898         tenants = self.tenants.list()
  899         network = self.networks.first()
  900 
  901         self.mock_list_dhcp_agent_hosting_networks.return_value = \
  902             self.agents.list()
  903         self._stub_is_extension_supported(
  904             {'network_availability_zone': True,
  905              'dhcp_agent_scheduler': True})
  906         self.mock_tenant_list.return_value = [tenants, False]
  907         self.mock_network_list.return_value = [network]
  908         self.mock_network_delete.return_value = None
  909 
  910         form_data = {'action': 'networks__delete__%s' % network.id}
  911         res = self.client.post(INDEX_URL, form_data)
  912 
  913         self.assertRedirectsNoFollow(res, INDEX_URL)
  914 
  915         self.mock_list_dhcp_agent_hosting_networks.assert_called_once_with(
  916             test.IsHttpRequest(), network.id)
  917         self._check_is_extension_supported(
  918             {'network_availability_zone': 1,
  919              'dhcp_agent_scheduler': 2})
  920         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  921         self.mock_network_list.assert_called_once_with(test.IsHttpRequest())
  922         self.mock_network_delete.assert_called_once_with(test.IsHttpRequest(),
  923                                                          network.id)
  924 
  925     @test.create_mocks({api.neutron: ('network_list',
  926                                       'network_delete',
  927                                       'list_dhcp_agent_hosting_networks',
  928                                       'is_extension_supported'),
  929                         api.keystone: ('tenant_list',)})
  930     def test_delete_network_exception(self):
  931         tenants = self.tenants.list()
  932         network = self.networks.first()
  933 
  934         self.mock_list_dhcp_agent_hosting_networks.return_value = \
  935             self.agents.list()
  936         self._stub_is_extension_supported(
  937             {'network_availability_zone': True,
  938              'dhcp_agent_scheduler': True})
  939         self.mock_tenant_list.return_value = [tenants, False]
  940         self.mock_network_list.return_value = [network]
  941         self.mock_network_delete.side_effect = self.exceptions.neutron
  942 
  943         form_data = {'action': 'networks__delete__%s' % network.id}
  944         res = self.client.post(INDEX_URL, form_data)
  945 
  946         self.assertRedirectsNoFollow(res, INDEX_URL)
  947 
  948         self.mock_list_dhcp_agent_hosting_networks.assert_called_once_with(
  949             test.IsHttpRequest(), network.id)
  950         self._check_is_extension_supported(
  951             {'network_availability_zone': 1,
  952              'dhcp_agent_scheduler': 2})
  953         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  954         self.mock_network_list.assert_called_once_with(test.IsHttpRequest())
  955         self.mock_network_delete.assert_called_once_with(test.IsHttpRequest(),
  956                                                          network.id)
  957 
  958     @test.create_mocks({api.neutron: ('is_extension_supported',)})
  959     @test.update_settings(FILTER_DATA_FIRST={'admin.networks': True})
  960     def test_networks_list_with_admin_filter_first(self):
  961         self._stub_is_extension_supported(
  962             {'network_availability_zone': True,
  963              'dhcp_agent_scheduler': True})
  964 
  965         res = self.client.get(reverse('horizon:admin:networks:index'))
  966         self.assertTemplateUsed(res, INDEX_TEMPLATE)
  967         networks = res.context['networks_table'].data
  968         self.assertItemsEqual(networks, [])
  969 
  970         self._check_is_extension_supported(
  971             {'network_availability_zone': 1,
  972              'dhcp_agent_scheduler': 1})
  973 
  974     @test.create_mocks({api.keystone: ('tenant_list',),
  975                         api.neutron: ('is_extension_supported',)})
  976     def test_networks_list_with_non_exist_tenant_filter(self):
  977         self._stub_is_extension_supported(
  978             {'network_availability_zone': True,
  979              'dhcp_agent_scheduler': True})
  980         self.mock_tenant_list.return_value = [self.tenants.list(), False]
  981 
  982         self.client.post(
  983             reverse('horizon:admin:networks:index'),
  984             data={'networks__filter_admin_networks__q_field': 'project',
  985                   'networks__filter_admin_networks__q': 'non_exist_tenant'})
  986         res = self.client.get(reverse('horizon:admin:networks:index'))
  987         self.assertTemplateUsed(res, INDEX_TEMPLATE)
  988         networks = res.context['networks_table'].data
  989         self.assertItemsEqual(networks, [])
  990 
  991         self._check_is_extension_supported(
  992             {'network_availability_zone': 2,
  993              'dhcp_agent_scheduler': 2})
  994         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
  995 
  996     @test.create_mocks({api.neutron: ('is_extension_supported',),
  997                         api.keystone: ('tenant_list',)})
  998     def test_network_create_without_physical_networks(self):
  999         tenants = self.tenants.list()
 1000         self.mock_tenant_list.return_value = [tenants, False]
 1001         self._stub_is_extension_supported(
 1002             {'provider': True,
 1003              'network_availability_zone': False,
 1004              'subnet_allocation': False})
 1005 
 1006         url = reverse('horizon:admin:networks:create')
 1007         res = self.client.get(url)
 1008         physical_network = res.context['form'].fields['physical_network']
 1009         self.assertEqual(type(physical_network), forms.CharField)
 1010 
 1011         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
 1012         self._check_is_extension_supported(
 1013             {'provider': 1,
 1014              'network_availability_zone': 2,
 1015              'subnet_allocation': 1})
 1016 
 1017     @test.create_mocks({api.neutron: ('is_extension_supported',),
 1018                         api.keystone: ('tenant_list',)})
 1019     @test.update_settings(
 1020         OPENSTACK_NEUTRON_NETWORK={
 1021             'physical_networks': ['default', 'test']})
 1022     def test_network_create_with_physical_networks(self):
 1023         tenants = self.tenants.list()
 1024         self.mock_tenant_list.return_value = [tenants, False]
 1025         self._stub_is_extension_supported(
 1026             {'provider': True,
 1027              'network_availability_zone': False,
 1028              'subnet_allocation': False})
 1029 
 1030         url = reverse('horizon:admin:networks:create')
 1031         res = self.client.get(url)
 1032         physical_network = res.context['form'].fields['physical_network']
 1033         self.assertEqual(type(physical_network), forms.ThemableChoiceField)
 1034         self.assertListEqual(list(physical_network.choices),
 1035                              [('default', 'default'), ('test', 'test')])
 1036 
 1037         self.mock_tenant_list.assert_called_once_with(test.IsHttpRequest())
 1038         self._check_is_extension_supported(
 1039             {'provider': 1,
 1040              'network_availability_zone': 2,
 1041              'subnet_allocation': 1})