"Fossies" - the Fresh Open Source Software Archive

Member "openstack-heat-13.0.0/heat/tests/engine/service/test_stack_snapshot.py" (16 Oct 2019, 11647 Bytes) of package /linux/misc/openstack/openstack-heat-13.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_stack_snapshot.py": 12.0.0_vs_13.0.0.

    1 # Licensed under the Apache License, Version 2.0 (the "License"); you may
    2 # not use this file except in compliance with the License. You may obtain
    3 # a copy of the License at
    4 #
    5 #         http://www.apache.org/licenses/LICENSE-2.0
    6 #
    7 # Unless required by applicable law or agreed to in writing, software
    8 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    9 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   10 # License for the specific language governing permissions and limitations
   11 # under the License.
   12 
   13 import uuid
   14 
   15 import mock
   16 from oslo_messaging.rpc import dispatcher
   17 import six
   18 
   19 from heat.common import exception
   20 from heat.common import template_format
   21 from heat.engine import service
   22 from heat.engine import stack
   23 from heat.objects import snapshot as snapshot_objects
   24 from heat.tests import common
   25 from heat.tests.engine import tools
   26 from heat.tests import utils
   27 
   28 
   29 class SnapshotServiceTest(common.HeatTestCase):
   30     # TODO(Qiming): Rework this test to handle OS::Nova::Server which
   31     # has a real snapshot support.
   32     def setUp(self):
   33         super(SnapshotServiceTest, self).setUp()
   34         self.ctx = utils.dummy_context()
   35 
   36         self.engine = service.EngineService('a-host', 'a-topic')
   37         self.engine.thread_group_mgr = service.ThreadGroupManager()
   38 
   39     def _create_stack(self, stack_name, files=None):
   40         t = template_format.parse(tools.wp_template)
   41         stk = utils.parse_stack(t, stack_name=stack_name, files=files)
   42         stk.state_set(stk.CREATE, stk.COMPLETE, 'mock completion')
   43 
   44         return stk
   45 
   46     def test_show_snapshot_not_found(self):
   47         stk = self._create_stack('stack_snapshot_not_found')
   48         snapshot_id = str(uuid.uuid4())
   49         ex = self.assertRaises(dispatcher.ExpectedException,
   50                                self.engine.show_snapshot,
   51                                self.ctx, stk.identifier(),
   52                                snapshot_id)
   53         expected = 'Snapshot with id %s not found' % snapshot_id
   54         self.assertEqual(exception.NotFound, ex.exc_info[0])
   55         self.assertIn(expected, six.text_type(ex.exc_info[1]))
   56 
   57     def test_show_snapshot_not_belong_to_stack(self):
   58         stk1 = self._create_stack('stack_snaphot_not_belong_to_stack_1')
   59         stk1._persist_state()
   60         snapshot1 = self.engine.stack_snapshot(
   61             self.ctx, stk1.identifier(), 'snap1')
   62         self.engine.thread_group_mgr.groups[stk1.id].wait()
   63         snapshot_id = snapshot1['id']
   64 
   65         stk2 = self._create_stack('stack_snaphot_not_belong_to_stack_2')
   66         stk2._persist_state()
   67         ex = self.assertRaises(dispatcher.ExpectedException,
   68                                self.engine.show_snapshot,
   69                                self.ctx, stk2.identifier(),
   70                                snapshot_id)
   71         expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) '
   72                     'could not be found') % {'snapshot': snapshot_id,
   73                                              'stack': stk2.name}
   74         self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0])
   75         self.assertIn(expected, six.text_type(ex.exc_info[1]))
   76 
   77     @mock.patch.object(stack.Stack, 'load')
   78     def test_create_snapshot(self, mock_load):
   79         files = {'a_file': 'the contents'}
   80         stk = self._create_stack('stack_snapshot_create', files=files)
   81         mock_load.return_value = stk
   82 
   83         snapshot = self.engine.stack_snapshot(
   84             self.ctx, stk.identifier(), 'snap1')
   85         self.assertIsNotNone(snapshot['id'])
   86         self.assertIsNotNone(snapshot['creation_time'])
   87         self.assertEqual('snap1', snapshot['name'])
   88         self.assertEqual("IN_PROGRESS", snapshot['status'])
   89         self.engine.thread_group_mgr.groups[stk.id].wait()
   90         snapshot = self.engine.show_snapshot(
   91             self.ctx, stk.identifier(), snapshot['id'])
   92         self.assertEqual("COMPLETE", snapshot['status'])
   93         self.assertEqual("SNAPSHOT", snapshot['data']['action'])
   94         self.assertEqual("COMPLETE", snapshot['data']['status'])
   95         self.assertEqual(files, snapshot['data']['files'])
   96         self.assertEqual(stk.id, snapshot['data']['id'])
   97         self.assertIsNotNone(stk.updated_time)
   98         self.assertIsNotNone(snapshot['creation_time'])
   99         mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
  100 
  101     @mock.patch.object(stack.Stack, 'load')
  102     def test_create_snapshot_action_in_progress(self, mock_load):
  103         stack_name = 'stack_snapshot_action_in_progress'
  104         stk = self._create_stack(stack_name)
  105         mock_load.return_value = stk
  106 
  107         stk.state_set(stk.UPDATE, stk.IN_PROGRESS, 'test_override')
  108         ex = self.assertRaises(dispatcher.ExpectedException,
  109                                self.engine.stack_snapshot,
  110                                self.ctx, stk.identifier(), 'snap_none')
  111         self.assertEqual(exception.ActionInProgress, ex.exc_info[0])
  112         msg = ("Stack %(stack)s already has an action (%(action)s) "
  113                "in progress.") % {'stack': stack_name, 'action': stk.action}
  114         self.assertEqual(msg, six.text_type(ex.exc_info[1]))
  115 
  116         mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
  117 
  118     @mock.patch.object(stack.Stack, 'load')
  119     def test_delete_snapshot_not_found(self, mock_load):
  120         stk = self._create_stack('stack_snapshot_delete_not_found')
  121         mock_load.return_value = stk
  122 
  123         snapshot_id = str(uuid.uuid4())
  124         ex = self.assertRaises(dispatcher.ExpectedException,
  125                                self.engine.delete_snapshot,
  126                                self.ctx, stk.identifier(), snapshot_id)
  127         self.assertEqual(exception.NotFound, ex.exc_info[0])
  128 
  129         mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
  130 
  131     @mock.patch.object(stack.Stack, 'load')
  132     def test_delete_snapshot_not_belong_to_stack(self, mock_load):
  133         stk1 = self._create_stack('stack_snapshot_delete_not_belong_1')
  134         mock_load.return_value = stk1
  135 
  136         snapshot1 = self.engine.stack_snapshot(
  137             self.ctx, stk1.identifier(), 'snap1')
  138         self.engine.thread_group_mgr.groups[stk1.id].wait()
  139         snapshot_id = snapshot1['id']
  140 
  141         mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
  142         mock_load.reset_mock()
  143 
  144         stk2 = self._create_stack('stack_snapshot_delete_not_belong_2')
  145         mock_load.return_value = stk2
  146 
  147         ex = self.assertRaises(dispatcher.ExpectedException,
  148                                self.engine.delete_snapshot,
  149                                self.ctx,
  150                                stk2.identifier(),
  151                                snapshot_id)
  152         expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) '
  153                     'could not be found') % {'snapshot': snapshot_id,
  154                                              'stack': stk2.name}
  155         self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0])
  156         self.assertIn(expected, six.text_type(ex.exc_info[1]))
  157 
  158         mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
  159         mock_load.reset_mock()
  160 
  161     @mock.patch.object(stack.Stack, 'load')
  162     def test_delete_snapshot_in_progress(self, mock_load):
  163         # can not delete the snapshot in snapshotting
  164         stk = self._create_stack('test_delete_snapshot_in_progress')
  165         mock_load.return_value = stk
  166         snapshot = mock.Mock()
  167         snapshot.id = str(uuid.uuid4())
  168         snapshot.status = 'IN_PROGRESS'
  169         self.patchobject(snapshot_objects.Snapshot,
  170                          'get_snapshot_by_stack').return_value = snapshot
  171         ex = self.assertRaises(dispatcher.ExpectedException,
  172                                self.engine.delete_snapshot,
  173                                self.ctx, stk.identifier(), snapshot.id)
  174         msg = 'Deleting in-progress snapshot is not supported'
  175         self.assertIn(msg, six.text_type(ex.exc_info[1]))
  176         self.assertEqual(exception.NotSupported, ex.exc_info[0])
  177 
  178     @mock.patch.object(stack.Stack, 'load')
  179     def test_delete_snapshot(self, mock_load):
  180         stk = self._create_stack('stack_snapshot_delete_normal')
  181         mock_load.return_value = stk
  182 
  183         snapshot = self.engine.stack_snapshot(
  184             self.ctx, stk.identifier(), 'snap1')
  185         self.engine.thread_group_mgr.groups[stk.id].wait()
  186         snapshot_id = snapshot['id']
  187         self.engine.delete_snapshot(self.ctx, stk.identifier(), snapshot_id)
  188         self.engine.thread_group_mgr.groups[stk.id].wait()
  189 
  190         ex = self.assertRaises(dispatcher.ExpectedException,
  191                                self.engine.show_snapshot, self.ctx,
  192                                stk.identifier(), snapshot_id)
  193         self.assertEqual(exception.NotFound, ex.exc_info[0])
  194 
  195         self.assertEqual(2, mock_load.call_count)
  196 
  197     @mock.patch.object(stack.Stack, 'load')
  198     def test_list_snapshots(self, mock_load):
  199         stk = self._create_stack('stack_snapshot_list')
  200         mock_load.return_value = stk
  201 
  202         snapshot = self.engine.stack_snapshot(
  203             self.ctx, stk.identifier(), 'snap1')
  204         self.assertIsNotNone(snapshot['id'])
  205         self.assertEqual("IN_PROGRESS", snapshot['status'])
  206         self.engine.thread_group_mgr.groups[stk.id].wait()
  207 
  208         snapshots = self.engine.stack_list_snapshots(
  209             self.ctx, stk.identifier())
  210         expected = {
  211             "id": snapshot["id"],
  212             "name": "snap1",
  213             "status": "COMPLETE",
  214             "status_reason": "Stack SNAPSHOT completed successfully",
  215             "data": stk.prepare_abandon(),
  216             "creation_time": snapshot['creation_time']}
  217 
  218         self.assertEqual([expected], snapshots)
  219         mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
  220 
  221     @mock.patch.object(stack.Stack, 'load')
  222     def test_restore_snapshot(self, mock_load):
  223         stk = self._create_stack('stack_snapshot_restore_normal')
  224         mock_load.return_value = stk
  225 
  226         snapshot = self.engine.stack_snapshot(
  227             self.ctx, stk.identifier(), 'snap1')
  228         self.engine.thread_group_mgr.groups[stk.id].wait()
  229         snapshot_id = snapshot['id']
  230         self.engine.stack_restore(self.ctx, stk.identifier(), snapshot_id)
  231         self.engine.thread_group_mgr.groups[stk.id].wait()
  232         self.assertEqual((stk.RESTORE, stk.COMPLETE), stk.state)
  233         self.assertEqual(2, mock_load.call_count)
  234 
  235     @mock.patch.object(stack.Stack, 'load')
  236     def test_restore_snapshot_other_stack(self, mock_load):
  237         stk1 = self._create_stack('stack_snapshot_restore_other_stack_1')
  238         mock_load.return_value = stk1
  239 
  240         snapshot1 = self.engine.stack_snapshot(
  241             self.ctx, stk1.identifier(), 'snap1')
  242         self.engine.thread_group_mgr.groups[stk1.id].wait()
  243         snapshot_id = snapshot1['id']
  244 
  245         mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
  246         mock_load.reset_mock()
  247 
  248         stk2 = self._create_stack('stack_snapshot_restore_other_stack_1')
  249         mock_load.return_value = stk2
  250 
  251         ex = self.assertRaises(dispatcher.ExpectedException,
  252                                self.engine.stack_restore,
  253                                self.ctx, stk2.identifier(),
  254                                snapshot_id)
  255         expected = ('The Snapshot (%(snapshot)s) for Stack (%(stack)s) '
  256                     'could not be found') % {'snapshot': snapshot_id,
  257                                              'stack': stk2.name}
  258         self.assertEqual(exception.SnapshotNotFound, ex.exc_info[0])
  259         self.assertIn(expected, six.text_type(ex.exc_info[1]))
  260 
  261         mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)