"Fossies" - the Fresh Open Source Software Archive

Member "scons-3.0.5/engine/SCons/Job.py" (26 Mar 2019, 16127 Bytes) of package /linux/misc/scons-3.0.5.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "Job.py" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3.0.4_vs_3.0.5.

    1 """SCons.Job
    2 
    3 This module defines the Serial and Parallel classes that execute tasks to
    4 complete a build. The Jobs class provides a higher level interface to start,
    5 stop, and wait on jobs.
    6 
    7 """
    8 
    9 #
   10 # Copyright (c) 2001 - 2019 The SCons Foundation
   11 #
   12 # Permission is hereby granted, free of charge, to any person obtaining
   13 # a copy of this software and associated documentation files (the
   14 # "Software"), to deal in the Software without restriction, including
   15 # without limitation the rights to use, copy, modify, merge, publish,
   16 # distribute, sublicense, and/or sell copies of the Software, and to
   17 # permit persons to whom the Software is furnished to do so, subject to
   18 # the following conditions:
   19 #
   20 # The above copyright notice and this permission notice shall be included
   21 # in all copies or substantial portions of the Software.
   22 #
   23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
   24 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
   25 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
   26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
   27 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
   28 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
   29 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   30 #
   31 
   32 __revision__ = "src/engine/SCons/Job.py a56bbd8c09fb219ab8a9673330ffcd55279219d0 2019-03-26 23:16:31 bdeegan"
   33 
   34 import SCons.compat
   35 
   36 import os
   37 import signal
   38 
   39 import SCons.Errors
   40 
   41 # The default stack size (in kilobytes) of the threads used to execute
   42 # jobs in parallel.
   43 #
   44 # We use a stack size of 256 kilobytes. The default on some platforms
   45 # is too large and prevents us from creating enough threads to fully
   46 # parallelized the build. For example, the default stack size on linux
   47 # is 8 MBytes.
   48 
   49 explicit_stack_size = None
   50 default_stack_size = 256
   51 
   52 interrupt_msg = 'Build interrupted.'
   53 
   54 
   55 class InterruptState(object):
   56    def __init__(self):
   57        self.interrupted = False
   58 
   59    def set(self):
   60        self.interrupted = True
   61 
   62    def __call__(self):
   63        return self.interrupted
   64 
   65 
   66 class Jobs(object):
   67     """An instance of this class initializes N jobs, and provides
   68     methods for starting, stopping, and waiting on all N jobs.
   69     """
   70 
   71     def __init__(self, num, taskmaster):
   72         """
   73         Create 'num' jobs using the given taskmaster.
   74 
   75         If 'num' is 1 or less, then a serial job will be used,
   76         otherwise a parallel job with 'num' worker threads will
   77         be used.
   78 
   79         The 'num_jobs' attribute will be set to the actual number of jobs
   80         allocated.  If more than one job is requested but the Parallel
   81         class can't do it, it gets reset to 1.  Wrapping interfaces that
   82         care should check the value of 'num_jobs' after initialization.
   83         """
   84 
   85         self.job = None
   86         if num > 1:
   87             stack_size = explicit_stack_size
   88             if stack_size is None:
   89                 stack_size = default_stack_size
   90                 
   91             try:
   92                 self.job = Parallel(taskmaster, num, stack_size)
   93                 self.num_jobs = num
   94             except NameError:
   95                 pass
   96         if self.job is None:
   97             self.job = Serial(taskmaster)
   98             self.num_jobs = 1
   99 
  100     def run(self, postfunc=lambda: None):
  101         """Run the jobs.
  102 
  103         postfunc() will be invoked after the jobs has run. It will be
  104         invoked even if the jobs are interrupted by a keyboard
  105         interrupt (well, in fact by a signal such as either SIGINT,
  106         SIGTERM or SIGHUP). The execution of postfunc() is protected
  107         against keyboard interrupts and is guaranteed to run to
  108         completion."""
  109         self._setup_sig_handler()
  110         try:
  111             self.job.start()
  112         finally:
  113             postfunc()
  114             self._reset_sig_handler()
  115 
  116     def were_interrupted(self):
  117         """Returns whether the jobs were interrupted by a signal."""
  118         return self.job.interrupted()
  119 
  120     def _setup_sig_handler(self):
  121         """Setup an interrupt handler so that SCons can shutdown cleanly in
  122         various conditions:
  123 
  124           a) SIGINT: Keyboard interrupt
  125           b) SIGTERM: kill or system shutdown
  126           c) SIGHUP: Controlling shell exiting
  127 
  128         We handle all of these cases by stopping the taskmaster. It
  129         turns out that it's very difficult to stop the build process
  130         by throwing asynchronously an exception such as
  131         KeyboardInterrupt. For example, the python Condition
  132         variables (threading.Condition) and queues do not seem to be
  133         asynchronous-exception-safe. It would require adding a whole
  134         bunch of try/finally block and except KeyboardInterrupt all
  135         over the place.
  136 
  137         Note also that we have to be careful to handle the case when
  138         SCons forks before executing another process. In that case, we
  139         want the child to exit immediately.
  140         """
  141         def handler(signum, stack, self=self, parentpid=os.getpid()):
  142             if os.getpid() == parentpid:
  143                 self.job.taskmaster.stop()
  144                 self.job.interrupted.set()
  145             else:
  146                 os._exit(2)
  147 
  148         self.old_sigint  = signal.signal(signal.SIGINT, handler)
  149         self.old_sigterm = signal.signal(signal.SIGTERM, handler)
  150         try:
  151             self.old_sighup = signal.signal(signal.SIGHUP, handler)
  152         except AttributeError:
  153             pass
  154 
  155     def _reset_sig_handler(self):
  156         """Restore the signal handlers to their previous state (before the
  157          call to _setup_sig_handler()."""
  158 
  159         signal.signal(signal.SIGINT, self.old_sigint)
  160         signal.signal(signal.SIGTERM, self.old_sigterm)
  161         try:
  162             signal.signal(signal.SIGHUP, self.old_sighup)
  163         except AttributeError:
  164             pass
  165 
  166 class Serial(object):
  167     """This class is used to execute tasks in series, and is more efficient
  168     than Parallel, but is only appropriate for non-parallel builds. Only
  169     one instance of this class should be in existence at a time.
  170 
  171     This class is not thread safe.
  172     """
  173 
  174     def __init__(self, taskmaster):
  175         """Create a new serial job given a taskmaster. 
  176 
  177         The taskmaster's next_task() method should return the next task
  178         that needs to be executed, or None if there are no more tasks. The
  179         taskmaster's executed() method will be called for each task when it
  180         is successfully executed, or failed() will be called if it failed to
  181         execute (e.g. execute() raised an exception)."""
  182         
  183         self.taskmaster = taskmaster
  184         self.interrupted = InterruptState()
  185 
  186     def start(self):
  187         """Start the job. This will begin pulling tasks from the taskmaster
  188         and executing them, and return when there are no more tasks. If a task
  189         fails to execute (i.e. execute() raises an exception), then the job will
  190         stop."""
  191         
  192         while True:
  193             task = self.taskmaster.next_task()
  194 
  195             if task is None:
  196                 break
  197 
  198             try:
  199                 task.prepare()
  200                 if task.needs_execute():
  201                     task.execute()
  202             except Exception as e:
  203                 if self.interrupted():
  204                     try:
  205                         raise SCons.Errors.BuildError(
  206                             task.targets[0], errstr=interrupt_msg)
  207                     except:
  208                         task.exception_set()
  209                 else:
  210                     task.exception_set()
  211 
  212                 # Let the failed() callback function arrange for the
  213                 # build to stop if that's appropriate.
  214                 task.failed()
  215             else:
  216                 task.executed()
  217 
  218             task.postprocess()
  219         self.taskmaster.cleanup()
  220 
  221 
  222 # Trap import failure so that everything in the Job module but the
  223 # Parallel class (and its dependent classes) will work if the interpreter
  224 # doesn't support threads.
  225 try:
  226     import queue
  227     import threading
  228 except ImportError:
  229     pass
  230 else:
  231     class Worker(threading.Thread):
  232         """A worker thread waits on a task to be posted to its request queue,
  233         dequeues the task, executes it, and posts a tuple including the task
  234         and a boolean indicating whether the task executed successfully. """
  235 
  236         def __init__(self, requestQueue, resultsQueue, interrupted):
  237             threading.Thread.__init__(self)
  238             self.setDaemon(1)
  239             self.requestQueue = requestQueue
  240             self.resultsQueue = resultsQueue
  241             self.interrupted = interrupted
  242             self.start()
  243 
  244         def run(self):
  245             while True:
  246                 task = self.requestQueue.get()
  247 
  248                 if task is None:
  249                     # The "None" value is used as a sentinel by
  250                     # ThreadPool.cleanup().  This indicates that there
  251                     # are no more tasks, so we should quit.
  252                     break
  253 
  254                 try:
  255                     if self.interrupted():
  256                         raise SCons.Errors.BuildError(
  257                             task.targets[0], errstr=interrupt_msg)
  258                     task.execute()
  259                 except:
  260                     task.exception_set()
  261                     ok = False
  262                 else:
  263                     ok = True
  264 
  265                 self.resultsQueue.put((task, ok))
  266 
  267     class ThreadPool(object):
  268         """This class is responsible for spawning and managing worker threads."""
  269 
  270         def __init__(self, num, stack_size, interrupted):
  271             """Create the request and reply queues, and 'num' worker threads.
  272             
  273             One must specify the stack size of the worker threads. The
  274             stack size is specified in kilobytes.
  275             """
  276             self.requestQueue = queue.Queue(0)
  277             self.resultsQueue = queue.Queue(0)
  278 
  279             try:
  280                 prev_size = threading.stack_size(stack_size*1024) 
  281             except AttributeError as e:
  282                 # Only print a warning if the stack size has been
  283                 # explicitly set.
  284                 if not explicit_stack_size is None:
  285                     msg = "Setting stack size is unsupported by this version of Python:\n    " + \
  286                         e.args[0]
  287                     SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
  288             except ValueError as e:
  289                 msg = "Setting stack size failed:\n    " + str(e)
  290                 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
  291 
  292             # Create worker threads
  293             self.workers = []
  294             for _ in range(num):
  295                 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
  296                 self.workers.append(worker)
  297 
  298             if 'prev_size' in locals():
  299                 threading.stack_size(prev_size)
  300 
  301         def put(self, task):
  302             """Put task into request queue."""
  303             self.requestQueue.put(task)
  304 
  305         def get(self):
  306             """Remove and return a result tuple from the results queue."""
  307             return self.resultsQueue.get()
  308 
  309         def preparation_failed(self, task):
  310             self.resultsQueue.put((task, False))
  311 
  312         def cleanup(self):
  313             """
  314             Shuts down the thread pool, giving each worker thread a
  315             chance to shut down gracefully.
  316             """
  317             # For each worker thread, put a sentinel "None" value
  318             # on the requestQueue (indicating that there's no work
  319             # to be done) so that each worker thread will get one and
  320             # terminate gracefully.
  321             for _ in self.workers:
  322                 self.requestQueue.put(None)
  323 
  324             # Wait for all of the workers to terminate.
  325             # 
  326             # If we don't do this, later Python versions (2.4, 2.5) often
  327             # seem to raise exceptions during shutdown.  This happens
  328             # in requestQueue.get(), as an assertion failure that
  329             # requestQueue.not_full is notified while not acquired,
  330             # seemingly because the main thread has shut down (or is
  331             # in the process of doing so) while the workers are still
  332             # trying to pull sentinels off the requestQueue.
  333             #
  334             # Normally these terminations should happen fairly quickly,
  335             # but we'll stick a one-second timeout on here just in case
  336             # someone gets hung.
  337             for worker in self.workers:
  338                 worker.join(1.0)
  339             self.workers = []
  340 
  341     class Parallel(object):
  342         """This class is used to execute tasks in parallel, and is somewhat 
  343         less efficient than Serial, but is appropriate for parallel builds.
  344 
  345         This class is thread safe.
  346         """
  347 
  348         def __init__(self, taskmaster, num, stack_size):
  349             """Create a new parallel job given a taskmaster.
  350 
  351             The taskmaster's next_task() method should return the next
  352             task that needs to be executed, or None if there are no more
  353             tasks. The taskmaster's executed() method will be called
  354             for each task when it is successfully executed, or failed()
  355             will be called if the task failed to execute (i.e. execute()
  356             raised an exception).
  357 
  358             Note: calls to taskmaster are serialized, but calls to
  359             execute() on distinct tasks are not serialized, because
  360             that is the whole point of parallel jobs: they can execute
  361             multiple tasks simultaneously. """
  362 
  363             self.taskmaster = taskmaster
  364             self.interrupted = InterruptState()
  365             self.tp = ThreadPool(num, stack_size, self.interrupted)
  366 
  367             self.maxjobs = num
  368 
  369         def start(self):
  370             """Start the job. This will begin pulling tasks from the
  371             taskmaster and executing them, and return when there are no
  372             more tasks. If a task fails to execute (i.e. execute() raises
  373             an exception), then the job will stop."""
  374 
  375             jobs = 0
  376             
  377             while True:
  378                 # Start up as many available tasks as we're
  379                 # allowed to.
  380                 while jobs < self.maxjobs:
  381                     task = self.taskmaster.next_task()
  382                     if task is None:
  383                         break
  384 
  385                     try:
  386                         # prepare task for execution
  387                         task.prepare()
  388                     except:
  389                         task.exception_set()
  390                         task.failed()
  391                         task.postprocess()
  392                     else:
  393                         if task.needs_execute():
  394                             # dispatch task
  395                             self.tp.put(task)
  396                             jobs = jobs + 1
  397                         else:
  398                             task.executed()
  399                             task.postprocess()
  400 
  401                 if not task and not jobs: break
  402 
  403                 # Let any/all completed tasks finish up before we go
  404                 # back and put the next batch of tasks on the queue.
  405                 while True:
  406                     task, ok = self.tp.get()
  407                     jobs = jobs - 1
  408 
  409                     if ok:
  410                         task.executed()
  411                     else:
  412                         if self.interrupted():
  413                             try:
  414                                 raise SCons.Errors.BuildError(
  415                                     task.targets[0], errstr=interrupt_msg)
  416                             except:
  417                                 task.exception_set()
  418 
  419                         # Let the failed() callback function arrange
  420                         # for the build to stop if that's appropriate.
  421                         task.failed()
  422 
  423                     task.postprocess()
  424 
  425                     if self.tp.resultsQueue.empty():
  426                         break
  427 
  428             self.tp.cleanup()
  429             self.taskmaster.cleanup()
  430 
  431 # Local Variables:
  432 # tab-width:4
  433 # indent-tabs-mode:nil
  434 # End:
  435 # vim: set expandtab tabstop=4 shiftwidth=4: