"Fossies" - the Fresh Open Source Software Archive

Member "horizon-16.0.0/openstack_dashboard/test/unit/api/test_swift.py" (16 Oct 2019, 11195 Bytes) of package /linux/misc/openstack/horizon-16.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file.

    1 # Copyright 2012 United States Government as represented by the
    2 # Administrator of the National Aeronautics and Space Administration.
    3 # All Rights Reserved.
    4 #
    5 # Copyright 2012 Nebula, Inc.
    6 #
    7 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    8 #    not use this file except in compliance with the License. You may obtain
    9 #    a copy of the License at
   10 #
   11 #         http://www.apache.org/licenses/LICENSE-2.0
   12 #
   13 #    Unless required by applicable law or agreed to in writing, software
   14 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   15 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   16 #    License for the specific language governing permissions and limitations
   17 #    under the License.
   18 
   19 from __future__ import absolute_import
   20 
   21 import mock
   22 
   23 from horizon import exceptions
   24 
   25 from openstack_dashboard import api
   26 from openstack_dashboard.test import helpers as test
   27 
   28 
   29 @mock.patch('swiftclient.client.Connection')
   30 class SwiftApiTests(test.APIMockTestCase):
   31     def test_swift_get_containers(self, mock_swiftclient):
   32         containers = self.containers.list()
   33         cont_data = [c._apidict for c in containers]
   34         swift_api = mock_swiftclient.return_value
   35         swift_api.get_account.return_value = [{}, cont_data]
   36 
   37         (conts, more) = api.swift.swift_get_containers(self.request)
   38 
   39         self.assertEqual(len(containers), len(conts))
   40         self.assertFalse(more)
   41         swift_api.get_account.assert_called_once_with(
   42             limit=1001, marker=None, prefix=None, full_listing=True)
   43 
   44     def test_swift_get_container_with_data(self, mock_swiftclient):
   45         container = self.containers.first()
   46         objects = self.objects.list()
   47         swift_api = mock_swiftclient.return_value
   48         swift_api.get_object.return_value = (container, objects)
   49 
   50         cont = api.swift.swift_get_container(self.request, container.name)
   51 
   52         self.assertEqual(container.name, cont.name)
   53         self.assertEqual(len(objects), len(cont.data))
   54         swift_api.get_object.assert_called_once_with(container.name, "")
   55 
   56     def test_swift_get_container_without_data(self, mock_swiftclient):
   57         container = self.containers.first()
   58         swift_api = mock_swiftclient.return_value
   59         swift_api.head_container.return_value = container
   60 
   61         cont = api.swift.swift_get_container(self.request,
   62                                              container.name,
   63                                              with_data=False)
   64 
   65         self.assertEqual(cont.name, container.name)
   66         self.assertIsNone(cont.data)
   67         swift_api.head_container.assert_called_once_with(container.name)
   68 
   69     def test_swift_create_duplicate_container(self, mock_swiftclient):
   70         metadata = {'is_public': False}
   71         container = self.containers.first()
   72         headers = api.swift._metadata_to_header(metadata=(metadata))
   73         swift_api = mock_swiftclient.return_value
   74         # Check for existence, then create
   75         swift_api.head_container.side_effect = self.exceptions.swift
   76         swift_api.put_container.return_value = container
   77 
   78         api.swift.swift_create_container(self.request,
   79                                          container.name,
   80                                          metadata=(metadata))
   81 
   82         swift_api.head_container.assert_called_once_with(container.name)
   83         swift_api.put_container.assert_called_once_with(container.name,
   84                                                         headers=headers)
   85 
   86     def test_swift_create_container(self, mock_swiftclient):
   87         metadata = {'is_public': True}
   88         container = self.containers.first()
   89         swift_api = mock_swiftclient.return_value
   90         swift_api.head_container.return_value = container
   91 
   92         with self.assertRaises(exceptions.AlreadyExists):
   93             api.swift.swift_create_container(self.request,
   94                                              container.name,
   95                                              metadata=(metadata))
   96 
   97         swift_api.head_container.assert_called_once_with(container.name)
   98 
   99     def test_swift_update_container(self, mock_swiftclient):
  100         metadata = {'is_public': True}
  101         container = self.containers.first()
  102         swift_api = mock_swiftclient.return_value
  103         headers = api.swift._metadata_to_header(metadata=(metadata))
  104         swift_api.post_container.return_value = container
  105 
  106         api.swift.swift_update_container(self.request,
  107                                          container.name,
  108                                          metadata=(metadata))
  109 
  110         swift_api.post_container.assert_called_once_with(container.name,
  111                                                          headers=headers)
  112 
  113     def test_swift_get_objects(self, mock_swiftclient):
  114         container = self.containers.first()
  115         objects = self.objects.list()
  116 
  117         swift_api = mock_swiftclient.return_value
  118         swift_api.get_container.return_value = [{}, objects]
  119 
  120         (objs, more) = api.swift.swift_get_objects(self.request,
  121                                                    container.name)
  122 
  123         self.assertEqual(len(objects), len(objs))
  124         self.assertFalse(more)
  125         swift_api.get_container.assert_called_once_with(
  126             container.name,
  127             limit=1001,
  128             marker=None,
  129             prefix=None,
  130             delimiter='/',
  131             full_listing=True)
  132 
  133     def test_swift_get_object_with_data_non_chunked(self, mock_swiftclient):
  134         container = self.containers.first()
  135         object = self.objects.first()
  136 
  137         swift_api = mock_swiftclient.return_value
  138         swift_api.get_object.return_value = [object, object.data]
  139 
  140         obj = api.swift.swift_get_object(self.request, container.name,
  141                                          object.name, resp_chunk_size=None)
  142 
  143         self.assertEqual(object.name, obj.name)
  144         swift_api.get_object.assert_called_once_with(
  145             container.name, object.name, resp_chunk_size=None)
  146 
  147     def test_swift_get_object_with_data_chunked(self, mock_swiftclient):
  148         container = self.containers.first()
  149         object = self.objects.first()
  150 
  151         swift_api = mock_swiftclient.return_value
  152         swift_api.get_object.return_value = [object, object.data]
  153 
  154         obj = api.swift.swift_get_object(
  155             self.request, container.name, object.name)
  156 
  157         self.assertEqual(object.name, obj.name)
  158         swift_api.get_object.assert_called_once_with(
  159             container.name, object.name, resp_chunk_size=api.swift.CHUNK_SIZE)
  160 
  161     def test_swift_get_object_without_data(self, mock_swiftclient):
  162         container = self.containers.first()
  163         object = self.objects.first()
  164 
  165         swift_api = mock_swiftclient.return_value
  166         swift_api.head_object.return_value = object
  167 
  168         obj = api.swift.swift_get_object(self.request,
  169                                          container.name,
  170                                          object.name,
  171                                          with_data=False)
  172 
  173         self.assertEqual(object.name, obj.name)
  174         self.assertIsNone(obj.data)
  175         swift_api.head_object.assert_called_once_with(container.name,
  176                                                       object.name)
  177 
  178     def test_swift_create_pseudo_folder(self, mock_swiftclient):
  179         container = self.containers.first()
  180         folder = self.folder.first()
  181         swift_api = mock_swiftclient.return_value
  182         exc = self.exceptions.swift
  183         swift_api.head_object.side_effect = exc
  184         swift_api.put_object.return_value = folder
  185 
  186         api.swift.swift_create_pseudo_folder(self.request,
  187                                              container.name,
  188                                              folder.name)
  189 
  190         swift_api.head_object.assert_called_once_with(container.name,
  191                                                       folder.name)
  192         swift_api.put_object.assert_called_once_with(container.name,
  193                                                      folder.name,
  194                                                      None,
  195                                                      headers={})
  196 
  197     def test_swift_create_duplicate_folder(self, mock_swiftclient):
  198         container = self.containers.first()
  199         folder = self.folder.first()
  200         swift_api = mock_swiftclient.return_value
  201         swift_api.head_object.return_value = folder
  202 
  203         with self.assertRaises(exceptions.AlreadyExists):
  204             api.swift.swift_create_pseudo_folder(self.request,
  205                                                  container.name,
  206                                                  folder.name)
  207 
  208         swift_api.head_object.assert_called_once_with(container.name,
  209                                                       folder.name)
  210 
  211     def test_swift_upload_object(self, mock_swiftclient):
  212         container = self.containers.first()
  213         obj = self.objects.first()
  214         fake_name = 'fake_object.jpg'
  215 
  216         class FakeFile(object):
  217             def __init__(self):
  218                 self.name = fake_name
  219                 self.data = obj.data
  220                 self.size = len(obj.data)
  221 
  222         headers = {'X-Object-Meta-Orig-Filename': fake_name}
  223 
  224         swift_api = mock_swiftclient.return_value
  225         test_file = FakeFile()
  226         swift_api.put_object.return_value = None
  227 
  228         api.swift.swift_upload_object(self.request,
  229                                       container.name,
  230                                       obj.name,
  231                                       test_file)
  232 
  233         swift_api.put_object.assert_called_once_with(
  234             container.name,
  235             obj.name,
  236             test.IsA(FakeFile),
  237             content_length=test_file.size,
  238             headers=headers)
  239 
  240     def test_swift_upload_object_without_file(self, mock_swiftclient):
  241         container = self.containers.first()
  242         obj = self.objects.first()
  243 
  244         swift_api = mock_swiftclient.return_value
  245         swift_api.put_object.return_value = None
  246 
  247         response = api.swift.swift_upload_object(self.request,
  248                                                  container.name,
  249                                                  obj.name,
  250                                                  None)
  251 
  252         self.assertEqual(0, response['bytes'])
  253         swift_api.put_object.assert_called_once_with(
  254             container.name,
  255             obj.name,
  256             None,
  257             content_length=0,
  258             headers={})
  259 
  260     def test_swift_object_exists(self, mock_swiftclient):
  261         container = self.containers.first()
  262         obj = self.objects.first()
  263 
  264         swift_api = mock_swiftclient.return_value
  265         swift_api.head_object.side_effect = [container, self.exceptions.swift]
  266 
  267         args = self.request, container.name, obj.name
  268 
  269         self.assertTrue(api.swift.swift_object_exists(*args))
  270         # Again, for a "non-existent" object
  271         self.assertFalse(api.swift.swift_object_exists(*args))
  272 
  273         self.assertEqual(2, swift_api.head_object.call_count)
  274         swift_api.head_object.assert_has_calls([
  275             mock.call(container.name, obj.name),
  276             mock.call(container.name, obj.name),
  277         ])