"Fossies" - the Fresh Open Source Software Archive

Member "buildbot-2.3.1/buildbot/test/unit/test_changes_base.py" (23 May 2019, 8727 Bytes) of package /linux/misc/buildbot-2.3.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "test_changes_base.py": 2.1.0_vs_2.2.0.

    1 # This file is part of Buildbot.  Buildbot is free software: you can
    2 # redistribute it and/or modify it under the terms of the GNU General Public
    3 # License as published by the Free Software Foundation, version 2.
    4 #
    5 # This program is distributed in the hope that it will be useful, but WITHOUT
    6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
    7 # FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
    8 # details.
    9 #
   10 # You should have received a copy of the GNU General Public License along with
   11 # this program; if not, write to the Free Software Foundation, Inc., 51
   12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
   13 #
   14 # Copyright Buildbot Team Members
   15 
   16 import mock
   17 
   18 from twisted.internet import defer
   19 from twisted.trial import unittest
   20 
   21 from buildbot.changes import base
   22 from buildbot.test.util import changesource
   23 from buildbot.test.util.misc import TestReactorMixin
   24 
   25 
   26 class TestChangeSource(changesource.ChangeSourceMixin,
   27                        TestReactorMixin,
   28                        unittest.TestCase):
   29     timeout = 120
   30 
   31     class Subclass(base.ChangeSource):
   32         pass
   33 
   34     @defer.inlineCallbacks
   35     def setUp(self):
   36         self.setUpTestReactor()
   37         yield self.setUpChangeSource()
   38 
   39     def tearDown(self):
   40         return self.tearDownChangeSource()
   41 
   42     @defer.inlineCallbacks
   43     def test_activation(self):
   44         cs = self.Subclass(name="DummyCS")
   45         cs.activate = mock.Mock(return_value=defer.succeed(None))
   46         cs.deactivate = mock.Mock(return_value=defer.succeed(None))
   47 
   48         # set the changesourceid, and claim the changesource on another master
   49         self.attachChangeSource(cs)
   50         self.setChangeSourceToMaster(self.OTHER_MASTER_ID)
   51 
   52         yield cs.startService()
   53         cs.clock.advance(cs.POLL_INTERVAL_SEC / 2)
   54         cs.clock.advance(cs.POLL_INTERVAL_SEC / 5)
   55         cs.clock.advance(cs.POLL_INTERVAL_SEC / 5)
   56         self.assertFalse(cs.activate.called)
   57         self.assertFalse(cs.deactivate.called)
   58         self.assertFalse(cs.active)
   59         self.assertEqual(cs.serviceid, self.DUMMY_CHANGESOURCE_ID)
   60 
   61         # clear that masterid
   62         yield cs.stopService()
   63         self.setChangeSourceToMaster(None)
   64 
   65         yield cs.startService()
   66         cs.clock.advance(cs.POLL_INTERVAL_SEC)
   67         self.assertTrue(cs.activate.called)
   68         self.assertFalse(cs.deactivate.called)
   69         self.assertTrue(cs.active)
   70 
   71         # stop the service and see that deactivate is called
   72         yield cs.stopService()
   73         self.assertTrue(cs.activate.called)
   74         self.assertTrue(cs.deactivate.called)
   75         self.assertFalse(cs.active)
   76 
   77 
   78 class TestPollingChangeSource(changesource.ChangeSourceMixin,
   79                               TestReactorMixin,
   80                               unittest.TestCase):
   81     timeout = 120
   82 
   83     class Subclass(base.PollingChangeSource):
   84         pass
   85 
   86     @defer.inlineCallbacks
   87     def setUp(self):
   88         self.setUpTestReactor()
   89         yield self.setUpChangeSource()
   90 
   91         self.attachChangeSource(self.Subclass(name="DummyCS"))
   92 
   93     def tearDown(self):
   94         return self.tearDownChangeSource()
   95 
   96     @defer.inlineCallbacks
   97     def runClockFor(self, _, secs):
   98         yield self.reactor.pump([1.0] * secs)
   99 
  100     def test_loop_loops(self):
  101         # track when poll() gets called
  102         loops = []
  103         self.changesource.poll = \
  104             lambda: loops.append(self.reactor.seconds())
  105 
  106         self.changesource.pollInterval = 5
  107         self.startChangeSource()
  108         d = defer.Deferred()
  109         d.addCallback(self.runClockFor, 12)
  110 
  111         @d.addCallback
  112         def check(_):
  113             # note that it does *not* poll at time 0
  114             self.assertEqual(loops, [5.0, 10.0])
  115         self.reactor.callWhenRunning(d.callback, None)
  116         return d
  117 
  118     def test_loop_exception(self):
  119         # track when poll() gets called
  120         loops = []
  121 
  122         def poll():
  123             loops.append(self.reactor.seconds())
  124             raise RuntimeError("oh noes")
  125         self.changesource.poll = poll
  126 
  127         self.changesource.pollInterval = 5
  128         self.startChangeSource()
  129 
  130         d = defer.Deferred()
  131         d.addCallback(self.runClockFor, 12)
  132 
  133         @d.addCallback
  134         def check(_):
  135             # note that it keeps looping after error
  136             self.assertEqual(loops, [5.0, 10.0])
  137             self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 2)
  138         self.reactor.callWhenRunning(d.callback, None)
  139         return d
  140 
  141     def test_poll_only_if_activated(self):
  142         """The polling logic only applies if the source actually starts!"""
  143 
  144         self.setChangeSourceToMaster(self.OTHER_MASTER_ID)
  145 
  146         loops = []
  147         self.changesource.poll = \
  148             lambda: loops.append(self.reactor.seconds())
  149 
  150         self.changesource.pollInterval = 5
  151         self.startChangeSource()
  152 
  153         d = defer.Deferred()
  154         d.addCallback(self.runClockFor, 12)
  155 
  156         @d.addCallback
  157         def check(_):
  158             # it doesn't do anything because it was already claimed
  159             self.assertEqual(loops, [])
  160 
  161         self.reactor.callWhenRunning(d.callback, None)
  162         return d
  163 
  164     def test_pollAtLaunch(self):
  165         # track when poll() gets called
  166         loops = []
  167         self.changesource.poll = \
  168             lambda: loops.append(self.reactor.seconds())
  169 
  170         self.changesource.pollInterval = 5
  171         self.changesource.pollAtLaunch = True
  172         self.startChangeSource()
  173 
  174         d = defer.Deferred()
  175         d.addCallback(self.runClockFor, 12)
  176 
  177         @d.addCallback
  178         def check(_):
  179             # note that it *does* poll at time 0
  180             self.assertEqual(loops, [0.0, 5.0, 10.0])
  181         self.reactor.callWhenRunning(d.callback, None)
  182         return d
  183 
  184 
  185 class TestReconfigurablePollingChangeSource(changesource.ChangeSourceMixin,
  186                                             TestReactorMixin,
  187                                             unittest.TestCase):
  188 
  189     class Subclass(base.ReconfigurablePollingChangeSource):
  190         pass
  191 
  192     @defer.inlineCallbacks
  193     def setUp(self):
  194         self.setUpTestReactor()
  195 
  196         yield self.setUpChangeSource()
  197 
  198         self.attachChangeSource(self.Subclass(name="DummyCS"))
  199 
  200     def tearDown(self):
  201         return self.tearDownChangeSource()
  202 
  203     @defer.inlineCallbacks
  204     def runClockFor(self, secs):
  205         yield self.reactor.pump([1.0] * secs)
  206 
  207     @defer.inlineCallbacks
  208     def test_loop_loops(self):
  209         # track when poll() gets called
  210         loops = []
  211         self.changesource.poll = \
  212             lambda: loops.append(self.reactor.seconds())
  213 
  214         yield self.startChangeSource()
  215         yield self.changesource.reconfigServiceWithSibling(self.Subclass(
  216             name="DummyCS", pollInterval=5, pollAtLaunch=False))
  217 
  218         yield self.runClockFor(12)
  219         # note that it does *not* poll at time 0
  220         self.assertEqual(loops, [5.0, 10.0])
  221 
  222     @defer.inlineCallbacks
  223     def test_loop_exception(self):
  224         # track when poll() gets called
  225         loops = []
  226 
  227         def poll():
  228             loops.append(self.reactor.seconds())
  229             raise RuntimeError("oh noes")
  230         self.changesource.poll = poll
  231 
  232         yield self.startChangeSource()
  233         yield self.changesource.reconfigServiceWithSibling(self.Subclass(
  234             name="DummyCS", pollInterval=5, pollAtLaunch=False))
  235 
  236         yield self.runClockFor(12)
  237         # note that it keeps looping after error
  238         self.assertEqual(loops, [5.0, 10.0])
  239         self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 2)
  240 
  241     @defer.inlineCallbacks
  242     def test_poll_only_if_activated(self):
  243         """The polling logic only applies if the source actually starts!"""
  244 
  245         self.setChangeSourceToMaster(self.OTHER_MASTER_ID)
  246 
  247         loops = []
  248         self.changesource.poll = \
  249             lambda: loops.append(self.reactor.seconds())
  250 
  251         yield self.startChangeSource()
  252         yield self.changesource.reconfigServiceWithSibling(self.Subclass(
  253             name="DummyCS", pollInterval=5, pollAtLaunch=False))
  254 
  255         yield self.runClockFor(12)
  256 
  257         # it doesn't do anything because it was already claimed
  258         self.assertEqual(loops, [])
  259 
  260     @defer.inlineCallbacks
  261     def test_pollAtLaunch(self):
  262         # track when poll() gets called
  263         loops = []
  264         self.changesource.poll = \
  265             lambda: loops.append(self.reactor.seconds())
  266         yield self.startChangeSource()
  267         yield self.changesource.reconfigServiceWithSibling(self.Subclass(
  268             name="DummyCS", pollInterval=5, pollAtLaunch=True))
  269 
  270         yield self.runClockFor(12)
  271 
  272         # note that it *does* poll at time 0
  273         self.assertEqual(loops, [0.0, 5.0, 10.0])