"Fossies" - the Fresh Open Source Software Archive

Member "openstack-heat-13.0.0/heat/tests/engine/service/test_threadgroup_mgr.py" (16 Oct 2019, 4921 Bytes) of package /linux/misc/openstack/openstack-heat-13.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "test_threadgroup_mgr.py": 12.0.0_vs_13.0.0.

    1 #
    2 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
    3 #    not use this file except in compliance with the License. You may obtain
    4 #    a copy of the License at
    5 #
    6 #         http://www.apache.org/licenses/LICENSE-2.0
    7 #
    8 #    Unless required by applicable law or agreed to in writing, software
    9 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
   10 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
   11 #    License for the specific language governing permissions and limitations
   12 #    under the License.
   13 
   14 import eventlet
   15 import mock
   16 
   17 from oslo_context import context
   18 
   19 from heat.engine import service
   20 from heat.tests import common
   21 
   22 
   23 class ThreadGroupManagerTest(common.HeatTestCase):
   24 
   25     def setUp(self):
   26         super(ThreadGroupManagerTest, self).setUp()
   27         self.f = 'function'
   28         self.fargs = ('spam', 'ham', 'eggs')
   29         self.fkwargs = {'foo': 'bar'}
   30         self.cnxt = 'ctxt'
   31         self.engine_id = 'engine_id'
   32         self.stack = mock.Mock()
   33         self.lock_mock = mock.Mock()
   34         self.stlock_mock = self.patch('heat.engine.service.stack_lock')
   35         self.stlock_mock.StackLock.return_value = self.lock_mock
   36         self.tg_mock = mock.Mock()
   37         self.thg_mock = self.patch('heat.engine.service.threadgroup')
   38         self.thg_mock.ThreadGroup.return_value = self.tg_mock
   39         self.cfg_mock = self.patch('heat.engine.service.cfg')
   40 
   41     def test_tgm_start_with_lock(self):
   42         thm = service.ThreadGroupManager()
   43         with self.patchobject(thm, 'start_with_acquired_lock'):
   44             mock_thread_lock = mock.Mock()
   45             mock_thread_lock.__enter__ = mock.Mock(return_value=None)
   46             mock_thread_lock.__exit__ = mock.Mock(return_value=None)
   47             self.lock_mock.thread_lock.return_value = mock_thread_lock
   48             thm.start_with_lock(self.cnxt, self.stack, self.engine_id, self.f,
   49                                 *self.fargs, **self.fkwargs)
   50             self.stlock_mock.StackLock.assert_called_with(self.cnxt,
   51                                                           self.stack.id,
   52                                                           self.engine_id)
   53 
   54             thm.start_with_acquired_lock.assert_called_once_with(
   55                 self.stack, self.lock_mock,
   56                 self.f, *self.fargs, **self.fkwargs)
   57 
   58     def test_tgm_start(self):
   59         stack_id = 'test'
   60 
   61         thm = service.ThreadGroupManager()
   62         ret = thm.start(stack_id, self.f, *self.fargs, **self.fkwargs)
   63 
   64         self.assertEqual(self.tg_mock, thm.groups['test'])
   65         self.tg_mock.add_thread.assert_called_with(
   66             thm._start_with_trace, context.get_current(), None,
   67             self.f, *self.fargs, **self.fkwargs)
   68         self.assertEqual(ret, self.tg_mock.add_thread())
   69 
   70     def test_tgm_add_timer(self):
   71         stack_id = 'test'
   72 
   73         thm = service.ThreadGroupManager()
   74         thm.add_timer(stack_id, self.f, *self.fargs, **self.fkwargs)
   75 
   76         self.assertEqual(self.tg_mock, thm.groups[stack_id])
   77         self.tg_mock.add_timer.assert_called_with(
   78             self.cfg_mock.CONF.periodic_interval,
   79             self.f, None, *self.fargs, **self.fkwargs)
   80 
   81     def test_tgm_add_msg_queue(self):
   82         stack_id = 'add_msg_queues_test'
   83         e1, e2 = mock.Mock(), mock.Mock()
   84         thm = service.ThreadGroupManager()
   85         thm.add_msg_queue(stack_id, e1)
   86         thm.add_msg_queue(stack_id, e2)
   87         self.assertEqual([e1, e2], thm.msg_queues[stack_id])
   88 
   89     def test_tgm_remove_msg_queue(self):
   90         stack_id = 'add_msg_queues_test'
   91         e1, e2 = mock.Mock(), mock.Mock()
   92         thm = service.ThreadGroupManager()
   93         thm.add_msg_queue(stack_id, e1)
   94         thm.add_msg_queue(stack_id, e2)
   95         thm.remove_msg_queue(None, stack_id, e2)
   96         self.assertEqual([e1], thm.msg_queues[stack_id])
   97         thm.remove_msg_queue(None, stack_id, e1)
   98         self.assertNotIn(stack_id, thm.msg_queues)
   99 
  100     def test_tgm_send(self):
  101         stack_id = 'send_test'
  102         e1, e2 = mock.MagicMock(), mock.Mock()
  103         thm = service.ThreadGroupManager()
  104         thm.add_msg_queue(stack_id, e1)
  105         thm.add_msg_queue(stack_id, e2)
  106         thm.send(stack_id, 'test_message')
  107 
  108 
  109 class ThreadGroupManagerStopTest(common.HeatTestCase):
  110 
  111     def test_tgm_stop(self):
  112         stack_id = 'test'
  113         done = []
  114 
  115         def function():
  116             while True:
  117                 eventlet.sleep()
  118 
  119         def linked(gt, thread):
  120             for i in range(10):
  121                 eventlet.sleep()
  122             done.append(thread)
  123 
  124         thm = service.ThreadGroupManager()
  125         thm.add_msg_queue(stack_id, mock.Mock())
  126         thread = thm.start(stack_id, function)
  127         thread.link(linked, thread)
  128 
  129         thm.stop(stack_id)
  130 
  131         self.assertIn(thread, done)
  132         self.assertNotIn(stack_id, thm.groups)
  133         self.assertNotIn(stack_id, thm.msg_queues)