"Fossies" - the Fresh Open Source Software Archive

Member "freezer-10.0.0/freezer/tests/unit/openstack/test_restore.py" (14 Apr 2021, 7820 Bytes) of package /linux/misc/openstack/freezer-10.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_restore.py": 9.0.0_vs_10.0.0.

    1 # (c) Copyright 2014,2015 Hewlett-Packard Development Company, L.P.
    2 # (c) Copyright 2016 Hewlett-Packard Enterprise Development Company, L.P
    3 # Licensed under the Apache License, Version 2.0 (the "License");
    4 # you may not use this file except in compliance with the License.
    5 # You may obtain a copy of the License at
    6 #
    7 #     http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   12 # See the License for the specific language governing permissions and
   13 # limitations under the License.
   14 
   15 
   16 """
   17 Freezer restore.py related tests
   18 """
   19 from unittest import mock
   20 
   21 from freezer.openstack import restore
   22 from freezer.tests import commons
   23 
   24 
   25 class Image(object):
   26     def __init__(self):
   27         self.id = 'test'
   28 
   29 
   30 class TestRestore(commons.FreezerBaseTestCase):
   31     def setUp(self):
   32         super(TestRestore, self).setUp()
   33 
   34     def test_restore_cinder_by_glance(self):
   35         backup_opt = commons.BackupOpt1()
   36         restore.RestoreOs(backup_opt.client_manager, backup_opt.container,
   37                           backup_opt.storage)
   38 
   39     def test_restore_cinder_by_glance_from_local(self):
   40         backup_opt = commons.BackupOpt1()
   41         restore.RestoreOs(backup_opt.client_manager, backup_opt.container,
   42                           'local')
   43 
   44     def test_restore_cinder_with_backup_id(self):
   45         backup_opt = commons.BackupOpt1()
   46         ros = restore.RestoreOs(backup_opt.client_manager,
   47                                 backup_opt.container, backup_opt.storage)
   48         ros.restore_cinder(35, 34, 33)
   49 
   50     def test_restore_cinder_without_backup_id(self):
   51         backup_opt = commons.BackupOpt1()
   52         ros = restore.RestoreOs(backup_opt.client_manager,
   53                                 backup_opt.container, backup_opt.storage)
   54         ros.restore_cinder(35, 34)
   55 
   56     def test_restore_nova(self):
   57         backup_opt = commons.BackupOpt1()
   58         restore.RestoreOs(backup_opt.client_manager, backup_opt.container,
   59                           backup_opt.storage)
   60 
   61     def test_restore_nova_from_local(self):
   62         backup_opt = commons.BackupOpt1()
   63         restore.RestoreOs(backup_opt.client_manager, backup_opt.container,
   64                           'local')
   65 
   66     def test_restore_cinder(self):
   67         storage = mock.MagicMock()
   68         storage.type = 'swift'
   69         backup1 = mock.MagicMock()
   70         backup1.created_at = "2020-08-31T16:32:30"
   71         backup2 = mock.MagicMock()
   72         backup2.created_at = "2020-08-31T16:32:31"
   73         cinder_client = mock.MagicMock()
   74         cinder_client.backups.list.return_value = [backup1, backup2]
   75         cinder_client.restores.restore.return_value = 'test'
   76         client_manager = mock.MagicMock()
   77         client_manager.get_cinder.return_value = cinder_client
   78         restore_os = restore.RestoreOs(client_manager, '/root/test/', storage)
   79         result = restore_os.restore_cinder(restore_from_timestamp=1598862750)
   80         self.assertIsNone(result)
   81         result = restore_os.restore_cinder(restore_from_timestamp=1598862749)
   82         self.assertIsNone(result)
   83 
   84     @mock.patch('shutil.rmtree')
   85     @mock.patch('tempfile.mkdtemp')
   86     @mock.patch('oslo_serialization.jsonutils.load')
   87     @mock.patch('os.listdir')
   88     @mock.patch('builtins.open')
   89     @mock.patch('freezer.utils.utils.ReSizeStream')
   90     @mock.patch('freezer.utils.utils.S3ResponseStream')
   91     def test_create_image(self, mock_s3stream, mock_resize, mock_open,
   92                           mock_list, mock_load, mock_mkdtemp, mock_rmtree):
   93         image = Image()
   94         storage = mock.MagicMock()
   95         storage.type = 's3'
   96         storage.get_object.return_value = {'Body': '{"test": "test_info"}',
   97                                            'ContentLength': 3}
   98         storage.get_object_prefix.return_value = 'test'
   99         storage.list_all_objects.return_value = [{'Key': '/12345'},
  100                                                  {'Key': '/12346'}]
  101         client_manager = mock.MagicMock()
  102         client_manager.create_image.return_value = image
  103         restore_os = restore.RestoreOs(client_manager, '/root/test/', storage)
  104         result1, result2 = restore_os._create_image('/root', 12344)
  105         self.assertEqual(result1, {"test": "test_info"})
  106         self.assertEqual(result2, image)
  107 
  108         storage.get_object_prefix.return_value = ''
  109         result1, result2 = restore_os._create_image('/root', 12344)
  110         self.assertEqual(result1, {"test": "test_info"})
  111         self.assertEqual(result2, image)
  112 
  113         mock_open.side_effect = Exception("error")
  114         storage.type = 'local'
  115         mock_list.return_value = ['12345']
  116         restore_os = restore.RestoreOs(client_manager, '/root/test', storage)
  117         try:
  118             restore_os._create_image('/root', 12344)
  119         except BaseException as e:
  120             self.assertEqual(str(e), "Failed to open image file"
  121                                      " /root/test//root/12345//root")
  122 
  123         mock_open.side_effect = 'test'
  124         mock_load.return_value = 'test'
  125         result1, result2 = restore_os._create_image('/root', 12344)
  126         self.assertEqual(result1, 'test')
  127         self.assertEqual(result2, image)
  128 
  129         storage.type = 'ssh'
  130         storage.open.side_effect = Exception("error")
  131         storage.listdir.return_value = ['12345']
  132         restore_os = restore.RestoreOs(client_manager, '/root/test', storage)
  133         try:
  134             restore_os._create_image('/root', 12344)
  135         except BaseException as e:
  136             self.assertEqual(str(e), "Failed to open remote image file "
  137                                      "/root/test//root/12345//root")
  138 
  139         storage.open.side_effect = 'test'
  140         storage.read_metadata_file.return_value = '{"test": "test_info"}'
  141         restore_os = restore.RestoreOs(client_manager, '/root/test', storage)
  142         result1, result2 = restore_os._create_image('/root', 12344)
  143         self.assertEqual(result1, {"test": "test_info"})
  144         self.assertEqual(result2, image)
  145 
  146         storage.type = 'ftp'
  147         storage.listdir.return_value = ['12345']
  148         mock_mkdtemp.side_effect = Exception('error')
  149         restore_os = restore.RestoreOs(client_manager, '/root/test', storage)
  150         try:
  151             restore_os._create_image('/root', 12344)
  152         except Exception as e:
  153             self.assertEqual(str(e), "Unable to create a tmp directory")
  154 
  155         mock_mkdtemp.side_effect = "success"
  156         mock_rmtree.return_value = "success"
  157         mock_open.side_effect = 'test'
  158         result1, result2 = restore_os._create_image('/root', 12344)
  159         self.assertEqual(result1, 'test')
  160         self.assertEqual(result2, image)
  161 
  162     def test_get_backups_exception(self):
  163         storage = mock.MagicMock()
  164         storage.type = 'ss3'
  165         client_manager = mock.MagicMock()
  166         restore_os = restore.RestoreOs(client_manager, '/root/test/', storage)
  167         try:
  168             restore_os._get_backups('/root', 12347)
  169         except BaseException as e:
  170             self.assertEqual(str(e), "ss3 storage type is not supported at the"
  171                                      " moment. Try local, SWIFT, SSH(SFTP),"
  172                                      " FTP or FTPS ")
  173 
  174         storage = mock.MagicMock()
  175         storage.type = 's3'
  176         storage.list_all_objects.return_value = [{'Key': '/12345'},
  177                                                  {'Key': '/12346'}]
  178         restore_os = restore.RestoreOs(client_manager, '/root/test/', storage)
  179         try:
  180             restore_os._get_backups('/root', 12347)
  181         except BaseException as e:
  182             self.assertEqual(str(e), "Cannot find backups for"
  183                                      " path: root/test///root")