Startup and Background Tasks in Chandler
Startup and Background Tasks in Chandler
The osaf.startup module provides an API for invoking repository items when the application has been started. For our examples, we'll be using a null repository view, rv:>>> from repository.persistence.RepositoryView import NullRepositoryView >>> rv = NullRepositoryView()
In order to run code at application startup, you simply create items of type osaf.startup.Startup in your parcel. Each item's invoke attribute names a function (or non-Item class) that should be invoked at startup. For our example, we'll use sys.stdout.write as the function we want to call:>>> from osaf.startup import Startup >>> test = Startup("test", invoke="sys.stdout.write", itsView = rv)
When run_startup() is called, the function or class named by the invoke attribute of each Startup item will be called with the corresponding item as its only parameter. In our example, this will cause the test object to be written to sys.stdout:>>> from osaf.startup import run_startup >>> run_startup(rv) <Startup (new): test ...>
Please note that startup invocation is intended for tasks like adding servers to the Twisted reactor, starting threads, or other in-process Python operations, based on data in the repository. It should not be used to modify repository items, as this may indirectly lead to merge conflicts with other threads. Any needed repository items should be created or modified via your parcel's installParcel() hook, rather than via startup items.
Individual startups can be enabled or disabled using their active boolean attribute:>>> test.active = False >>> run_startup(rv) # Item is disabled, so nothing happens
You can create subclasses of Startup, if your startup code needs stored configuration of some kind. Typically, you will override the onStart() method of Startup in your subclass so that it performs the desired behavior using attributes of the item. For example:>>> from application import schema >>> class StartupMessage(Startup): ... message = schema.One(schema.Text, initialValue=u"Hello, world!") ... def onStart(self): ... print self.message.encode('utf8') >>> hello = StartupMessage(itsView=rv) >>> run_startup(rv) Hello, world!
Notice that we did not invoke the superclass onStart() method, because we don't want the invoke attribute to take effect.
By default, there is no particular order that startup items start in. But sometimes, a startup item needs another item to be started first. The requires attribute can be used to reference other startup items that should be started first:>>> goodbye = StartupMessage( ... message=u"Goodbye, world!", ... requires=[hello], ... itsView=rv ... ) >>> run_startup(rv) Hello, world! Goodbye, world! >>> goodbye.requires =  >>> hello.requires = [goodbye] >>> run_startup(rv) Goodbye, world! Hello, world!
Note that a startup will not run unless all of its required items have been started:>>> goodbye.active = False >>> run_startup(rv) # hello needs goodbye, so neither runs here
Note that this also means that creating a requirements loop (where A requires B and B requires A) will prevent all items in the loop from starting:>>> goodbye.active = True >>> goodbye.requires = [hello] >>> run_startup(rv) # co-dependents won't be started
To run your startup code in a separate thread, you can create an item of type Thread instead of Startup. The object named by the Thread item's invoke attribute will be called in a new thread with its own repository view, but in all other respects a Thread is the same as a Startup.
Here's an example routine that we might run in a thread. It waits for a start flag to be set, then sets a finished flag and exits:>>> start = False >>> finished = False >>> def my_thread(thread_item): ... global start, finished ... while not start: pass ... finished = True
Normally, if you wanted to run this code in a thread at startup, you would just put it in a module, and then use its name (e.g. some_module.my_thread) as the invoke attribute of a Thread item. But for demonstration purposes, we don't want to have to put this code in a separate module, so we'll import an already existing module, and shove this routine into its namespace temporarily, so that our Thread will be able to find it:>>> from osaf.tests import TestStartups >>> TestStartups.my_thread = my_thread
And now we can make a Thread that will run it:>>> from osaf.startup import Thread >>> t = Thread( ... "my_name_here", invoke="osaf.tests.TestStartups.my_thread", itsView=rv ... ) >>> t <Thread (new): my_name_here ...> >>> run_startup(rv)
So now, the thread has been started:>>> import threading >>> threading.enumerate() [...<RepositoryThread(//userdata/my_name_here, started daemon)>...]
but it's looping, waiting for the start flag to be set:>>> finished False
Because this and some of our later examples will need to wait a few moments for a thread to finish, we'll need a simple utility routine that sleeps until some condition occurs, or raises an error if it seems to be taking too long:>>> from time import sleep >>> def wait_until(condition): ... ms = 1 ... while ms<=10000: # Wait a maximum of 11.111 seconds ... if condition(): ... return ... sleep(ms/1000.0) # sleep for ms milliseconds ... ms *= 10 # wait 10 times as long on the next try ... raise AssertionError("Condition didn't happen before timeout")
So, now that we have a way to wait for the thread, let's set the start flag and wait until the thread finishes:>>> start = True >>> wait_until(lambda: finished) # let the thread finish
And now the finish flag should have been set by the thread:>>> finished True
We don't want this startup.Thread to be active in subsequent examples, so we'll disable it now:>>> t.active = False # don't start `t` in future tests
Chandler needs to run Twisted's reactor in a thread, so that it can run independently of the GUI (which could otherwise block its execution). The osaf.startup module provides a few APIs for this:>>> from osaf.startup import get_reactor_thread, run_reactor, stop_reactor
get_reactor_thread() returns the threading.Thread object that the reactor is running in (assuming it was started using ReactorThread), or None:>>> print get_reactor_thread() None
stop_reactor() stops the reactor if it's running (regardless of what thread it's in) and waits for the reactor thread to exit, if it's running. In other words, it guarantees that reactor.running is False and get_reactor_thread() is None when it returns:>>> stop_reactor()
run_reactor(in_thread) calls Twisted's reactor.run(), with some additional wrapper code to ensure it can run safely, only runs one reactor at a time, and runs in a separate thread (by default):>>> run_reactor() >>> get_reactor_thread() <RepositoryThread(reactor, started daemon)>
And then we can verify that the reactor is in fact running when run_reactor returns:>>> from twisted.internet import reactor >>> reactor.running 1
And then stop it, verifying that both the reactor and thread have stopped:>>> rt = get_reactor_thread() >>> stop_reactor() >>> print get_reactor_thread() None >>> rt.isAlive() False >>> reactor.running 0
Calling run_reactor() when the reactor is already running in another thread has no effect:>>> run_reactor() >>> rt = get_reactor_thread() >>> run_reactor() >>> rt is get_reactor_thread() True
But trying to run the reactor in the current thread (by passing a False in_thread argument) when it's already running in some thread raises an error:>>> run_reactor(False) Traceback (most recent call last): ... AssertionError: Reactor is already running >>> stop_reactor() # make sure we've stopped again for next test >>> reactor.running 0
So, running the Twisted reactor at startup is as simple as defining a normal Startup instance to invoke run_reactor():>>> twisted_startup = Startup( ... "twisted", itsView=rv, invoke="osaf.startup.run_reactor" ... ) >>> run_startup(rv) >>> reactor.running 1 >>> stop_reactor() >>> twisted_startup.active = False
Note that it doesn't matter how many Startup instances start the reactor, as it will only be started once, by the first one that tries. If you have startup code that depends on the reactor, therefore, you can either define a Startup for run_reactor and reference it in another Startup, or you can just call run_reactor() in the code that needs it.
Also note that you should not invoke run_reactor() from any thread but the main thread, or you will receive an assertion error in that thread, and the reactor will not start:>>> twisted_thread = Thread( ... "dont_do_this", itsView=rv, invoke="osaf.startup.run_reactor" ... ) >>> import sys, StringIO # trap stderr output >>> old_stderr, sys.stderr = sys.stderr, StringIO.StringIO() >>> run_startup(rv) >>> reactor.running 0 >>> wait_until( ... # wait until only one thread is running ... lambda: len(threading.enumerate())==1 ... ) >>> print sys.stderr.getvalue() Exception in thread //userdata/dont_do_this: Traceback (most recent call last): ... AssertionError: can't start reactor thread except from the main thread... >>> sys.stderr = old_stderr >>> twisted_thread.active = False # deactivate the startup we used
A TwistedTask is similar to a Thread, in that it begins an independent task with its own repository view. It is different in that all TwistedTask instances run in the same thread: the Twisted "reactor" thread. These tasks are co-operatively multitasked, by receiving callbacks from the Twisted reactor object. If you are comfortable with the Twisted API, or at least want to make use of it, you may find some advantage here over using threads.
As with Startup and Thread, a TwistedTask instance invokes the function or class named by its invoke attribute. The only difference is that the invocation occurs in the Twisted "reactor" thread, after the reactor has been started. Your startup code (function, or class __init__) should accept the TwistedTask item as its sole argument, and it should do whatever reactor setup it then needs to do, such as registering listening ports or scheduling timed callbacks.
For our example, we'll just create a task that just prints what thread it was called from, and counts the number of items started:>>> started = 0 >>> def hello_from(item): ... global started ... print "running",item,"in",threading.currentThread() ... started += 1
and put it in a module so we can import it (again, we're doing this so you don't have to go read another file to see the source; in the "real world" you would just define the function in the appropriate module to start with):>>> TestStartups.hello_from = hello_from
Now let's create the task item and run it:>>> from osaf.startup import TwistedTask >>> demo = TwistedTask( ... "demo", itsView=rv, invoke="osaf.tests.TestStartups.hello_from" ... )
Giving it a chance to run and complete:>>> run_startup(rv); wait_until(lambda: started) # give it a chance to run running <TwistedTask ... demo ...> in <RepositoryThread(reactor,...)> >>> stop_reactor() # shut down the reactor again >>> len(threading.enumerate()) # all other threads should be stopped now 1 >>> demo.active = False # disable for next test(s)
Sometimes you don't need the full power of the Twisted API, or don't want to take the time to learn it for a quick hack. Or, even if you're familiar with the Twisted API, you'd like to have a convenient way to repeatedly run some code every N seconds or minutes. Then you can use a PeriodicTask item.
A PeriodicTask is similar to a TwistedTask, but its invoke target must be a class or factory function that returns an object with a run() method. The class or factory will be invoked in the reactor thread at startup, and the run() method will be called in its own thread repeatedly as long as it returns True from each call. If it returns False, None, or any other false value (or raises an uncaught exception) the repeated calls will cease. The interval between calls is determined by the PeriodicTask item's interval attribute, which must be set to a datetime.timedelta object.
For our example, we'll create a simple class suitable for use as a periodic task:>>> run_count = 0 >>> class Counter: ... def __init__(self, item): ... self.item = item ... print "__init__", ... hello_from(item) # show what thread we were called from ... ... def run(self): ... global run_count ... print "run() call", run_count+1, ... hello_from(self.item) # show what thread we were called from ... run_count += 1 ... return run_count<5 # stop when count reaches 5
And once again, we'll insert this class into the TestStartups module, so you don't have to go there to read the source. (But you should just set invoke to import from wherever your classes or functions are defined.):>>> TestStartups.Counter = Counter
And now we'll create a PeriodicTask to create the counter and call it once per microsecond (so the test runs quickly!):>>> from datetime import timedelta >>> from osaf.startup import PeriodicTask >>> counter = PeriodicTask( ... "counter", itsView=rv, invoke="osaf.tests.TestStartups.Counter", ... interval = timedelta(microseconds=1) ... ) >>> run_startup(rv); wait_until(lambda: run_count==5) __init__ running ... in <RepositoryThread(reactor,...)> run() call 1 running ... in <RepositoryThread(RepositoryPoolThread...)> run() call 2 running ... in <RepositoryThread(RepositoryPoolThread...)> run() call 3 running ... in <RepositoryThread(RepositoryPoolThread...)> run() call 4 running ... in <RepositoryThread(RepositoryPoolThread...)> run() call 5 running ... in <RepositoryThread(RepositoryPoolThread...)>
Notice that the __init__ method is called at startup time in the reactor thread, but subsequent run() calls occur in various "pool" threads managed by Twisted. It is not guaranteed that your run() method will execute in the same thread from one call to the next, so you should not rely on it being so. (We aren't showing the thread IDs in the example above, because they're not only different from one call to the next, they're also different from one test run to the next, so the test would fail if we included that info.)
Sometimes, you may want your task to run at startup, as well as after the first delay interval. You can set the run_at_startup flag to enable this:>>> counter.run_at_startup = True
To demonstrate, we'll set the delay interval to a year:>>> counter.interval = timedelta(days=365) >>> run_count = 0 # reset the counter >>> run_startup(rv); wait_until(lambda: run_count) run() call 1 running ... in <RepositoryThread(RepositoryPoolThread...)>
Notice that call 1 ran, even though there's a year between runs.
This is probably a good time to mention that we can manually force a task to be run immediately using the run_once() method:>>> counter.run_once(); wait_until(lambda: run_count==2) run() call 2 running ... in <RepositoryThread(RepositoryPoolThread...)>
and that we can change the interval of a running PeriodicTask, using the reschedule() method:>>> counter.reschedule( ... timedelta(microseconds=1) ... ); wait_until(lambda: run_count==5) run() call 3 running ... in <RepositoryThread(RepositoryPoolThread...)> run() call 4 running ... in <RepositoryThread(RepositoryPoolThread...)> run() call 5 running ... in <RepositoryThread(RepositoryPoolThread...)>
There's also a stop() method, which will prevent any further invocations from occurring, until/unless you call reschedule() again. If you don't provide an interval argument to reschedule(), it defaults to the current interval of the PeriodicTask you called it on:>>> counter.stop() >>> counter.reschedule(); wait_until(lambda: run_count==6) __init__ running ... in <RepositoryThread(reactor,...)> run() call 6 running ... in <RepositoryThread(RepositoryPoolThread...)>
(Note that if you stop() your periodic task, the task object (e.g. the Counter instance in our examples above) may be garbage collected by Python, if there are no longer any active references to it. If this is the case, your next attempt to run_once(), stop(), or reschedule() the task will create a new instance, such that its __init__() method is called again.)
Anyway, if we turn the run_at_startup flag back off, it doesn't get a run() call at startup:>>> counter.stop() >>> counter.interval = timedelta(days=365) >>> counter.run_at_startup = False >>> started = 0 # reset the counter >>> run_startup(rv); wait_until(lambda: started) __init__ running ... in <RepositoryThread(reactor,...)>
Finally, let's clean up after our PeriodicTask example:>>> stop_reactor() >>> len(threading.enumerate()) # all other threads should be stopped now 1 >>> counter.active = False # disable for subsequent tests
When running background tasks that use the repository, each logical task or individual thread needs its own repository view, so that it doesn't become confused by changes being made by other threads or tasks at the same time. TwistedTask, PeriodicTask, and Thread items handle this for you automatically, by calling the fork_item() API on an item. fork_item() opens a new repository view and returns a copy of the item that is native to the new view. (Note that this means you should periodically refresh the view, to ensure that you are seeing an up-to-date persistent state for the item(s).)
To demonstrate, let's set up a "real" (in-memory) repository, since null repository views can't be forked:>>> from repository.persistence.DBRepository import DBRepository >>> import os, repository >>> rep = DBRepository('__nosuchfile__') >>> rep.create(ramdb=True, refcounted=True)
First, we'll create an item we want to fork:>>> anItem = schema.Item("Demo", rep.view) >>> anItem <Item (new): Demo ...>
Then, we'll commit the repository, because otherwise our item won't be visible in the new view:>>> rep.view.commit()
And now we can fork the item:>>> from osaf.startup import fork_item >>> newItem = fork_item(anItem) >>> newItem <Item: Demo ...> >>> newItem.itsUUID == anItem.itsUUID True >>> newItem.itsView is anItem.itsView False >>> newItem.itsView.repository is anItem.itsView.repository True
Note that you should close a view when you are done with it, to avoid possible memory or resource leaks:>>> newItem.itsView.closeView() >>> anItem.itsView.closeView()
For this test document, we'll also close the repository and our null repository view:>>> rep.close() >>> rv.closeView()