"Fossies" - the Fresh Open Source Software Archive

Member "muscle/system/ThreadPool.h" (8 Jun 2019, 8917 Bytes) of package /linux/privat/muscle7.30.zip:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "ThreadPool.h" see the Fossies "Dox" file reference documentation.

    1 /* This file is Copyright 2000-2013 Meyer Sound Laboratories Inc.  See the included LICENSE.txt file for details. */  
    2 
    3 #ifndef MuscleThreadPool_h
    4 #define MuscleThreadPool_h
    5 
    6 #include "system/Thread.h"
    7 #include "system/Mutex.h"
    8 #include "util/Queue.h"
    9 #include "util/RefCount.h"
   10 #include "util/NetworkUtilityFunctions.h"
   11 #include "util/ObjectPool.h"  // for AbstractObjectRecycler
   12 
   13 namespace muscle {
   14 
   15 class ThreadPool;
   16 
   17 /** This is an interface class that should be implemented by objects that want to make use of a ThreadPool. */
   18 class IThreadPoolClient
   19 {
   20 public:
   21    /** Constructor.  
   22      * @param threadPool Pointer to the ThreadPool object this client should register with, or NULL if you wish this client
   23      *                   to start out unregistered.  (If the latter, be sure to call SetThreadPool() later on).
   24      */
   25    IThreadPoolClient(ThreadPool * threadPool) : _threadPool(NULL) {SetThreadPool(threadPool);}
   26 
   27    /** Destructor.  Note that if this object is still registered with a ThreadPool when this destructor is called,
   28      * an assertion failure will be triggered -- registered IThreadPoolClient objects MUST call SetThreadPool(NULL) 
   29      * BEFORE any part of them is destroyed, in order to avoid race conditions between their constructors and
   30      * their MessageReceivedFromThreadPool() method callbacks.
   31      */
   32    virtual ~IThreadPoolClient();
   33 
   34    /** Sends the specified MessageRef object to the ThreadPool for later handling.  
   35      * @param msg A Message for the ThreadPool to handle.  Our MessageReceivedFromThreadPoolClient() method
   36      *            will be called in the near future, from within a ThreadPool thread.
   37      * @returns B_NO_ERROR if the Message was scheduled for execution by a thread in the ThreadPool, or B_ERROR if it wasn't.
   38      * Messages are guaranteed to be processed in the order that they were passed to this method, but there is no
   39      * guarantee that they will all be processed in the same thread as each other.
   40      */
   41    status_t SendMessageToThreadPool(const MessageRef & msg);
   42 
   43    /** Changes this IThreadPoolClient to use a different ThreadPool.  This will unregister this client
   44      * from the current ThreadPool if necessary, and register with the new one if necessary.
   45      *
   46      * @param tp The new ThreadPool to register with, or NULL if you only want to unregister from the old one.
   47      *
   48      * Note that it is REQUIRED for registered IThreadPoolClient objects to call SetThreadPool(NULL) to
   49      * un-register themselves from their ThreadPool BEFORE their destructors start to tear down their state;
   50      * otherwise race conditions will result if the ThreadPool object happens to call MessageReceivedFromThreadPoolClient()
   51      * on the partially-deconstructed IThreadPoolClient object.
   52      *
   53      * Note that if Messages callbacks for this IThreadPoolClient are still pending in the ThreadPool object
   54      * when this method un-registers the IThreadPoolClient object, then this method will block until after all those
   55      * callbacks have completed.
   56      */
   57    void SetThreadPool(ThreadPool * tp);
   58    
   59    /** Returns a pointer to the ThreadPool this client is currently registered with, or NULL if it isn't registered anywhere. */
   60    ThreadPool * GetThreadPool() const {return _threadPool;}
   61 
   62 protected:
   63    /** Called from inside one of the ThreadPool's threads, some time after SendMessageToThreadPool(msg) was
   64      * called by the IThreadPoolClient.
   65      * @param msg The MessageRef that was passed to SendMessageToThreadPool().
   66      * @param numLeft The number of additional Messages that will arrive in this batch after this one.
   67      * Note that since this method will be called in a different thread from the thread that called
   68      * SendMessageToThreadPool(), implementations of this method need to be careful when accessing member
   69      * variables to avoid race conditions.
   70      */
   71    virtual void MessageReceivedFromThreadPool(const MessageRef & msg, uint32 numLeft) = 0;
   72 
   73 private:
   74    friend class ThreadPool;
   75 
   76    ThreadPool * _threadPool;
   77 };
   78 
   79 /** This class allows you to multiplex the handling of a large number of parallel Message streams
   80   * into a finite number of actual Threads.  By doing so you can (roughly) simulate the behavior
   81   * of a much larger number of Threads than are actually created.
   82   *
   83   * This class is Thread-safe, in that you can have IThreadPoolClients using it from different
   84   * threads simultaneously, and it will still work as expected.
   85   */
   86 class ThreadPool : private AbstractObjectRecycler
   87 {
   88 public:
   89    /** Constructor. 
   90      * @param maxThreadCount The maximum number of Threads this ThreadPool object will be allowed
   91      *                       to create at once.  Defaults to 16.
   92      */
   93    ThreadPool(uint32 maxThreadCount = 16);
   94 
   95    /** Destructor.  */
   96    virtual ~ThreadPool();
   97 
   98    /** Returns the maximum number of threads this ThreadPool is allowed to keep around at once time (as specified in the constructor) */
   99    uint32 GetMaxThreadCount() const {return _maxThreadCount;}
  100 
  101    /** Used for debugging only */
  102    virtual void PrintToStream() const 
  103    {
  104       printf("ThreadPool %p:  _maxThreadCount=" UINT32_FORMAT_SPEC ", _shuttingDown=%i, _threadIDCounter=" UINT32_FORMAT_SPEC ", _availableThreads=" UINT32_FORMAT_SPEC ", _activeThreads=" UINT32_FORMAT_SPEC ", _registeredClients=" UINT32_FORMAT_SPEC ", _pendingMessages=" UINT32_FORMAT_SPEC ", _deferredMessages=" UINT32_FORMAT_SPEC ", _waitingForCompletion=" UINT32_FORMAT_SPEC "\n", this, _maxThreadCount, _shuttingDown, _threadIDCounter, _availableThreads.GetNumItems(), _activeThreads.GetNumItems(), _registeredClients.GetNumItems(), _pendingMessages.GetNumItems(), _deferredMessages.GetNumItems(), _waitingForCompletion.GetNumItems());
  105    }
  106 
  107 protected:
  108    /** Starts the specified Thread object's internal thread running.
  109      * Broken out into a virtual method so that the Thread's attributes (stack size, etc) can be customized if desired.   
  110      * Default implementation just calls StartInternalThread() on the thread object.
  111      * @param thread The Thread object to start.
  112      * @returns B_NO_ERROR if the Thread was successfully started, or B_ERROR otherwise.
  113      */
  114    virtual status_t StartInternalThread(Thread & thread);
  115 
  116 private:
  117    virtual void RecycleObject(void * /*obj*/) {/* empty */}
  118    virtual uint32 FlushCachedObjects() {return Shutdown();}  // called by SetupSystem destructor, to avoid crashes on exit
  119    uint32 Shutdown();
  120 
  121 #ifndef DOXYGEN_SHOULD_IGNORE_THIS  // this is here so doxygen-coverage won't complain that I haven't documented this class -- but it's a private class so I don't need to
  122    class ThreadPoolThread : public Thread, public RefCountable
  123    {
  124    public:
  125       ThreadPoolThread(ThreadPool * tp, uint32 threadID) : _threadID(threadID), _threadPool(tp), _currentClient(NULL) {/* empty */}
  126 
  127       uint32 GetThreadID() const {return _threadID;}
  128       status_t SendMessagesToInternalThread(IThreadPoolClient * client, Queue<MessageRef> & mq);
  129 
  130    protected:
  131       virtual status_t MessageReceivedFromOwner(const MessageRef & msgRef, uint32 numLeft);
  132 
  133    private:
  134       const uint32 _threadID;
  135       ThreadPool * _threadPool;
  136       IThreadPoolClient * _currentClient;
  137       Queue<MessageRef> _internalQueue;
  138    };
  139    DECLARE_REFTYPES(ThreadPoolThread);
  140 #endif
  141 
  142    friend class IThreadPoolClient;
  143    friend class ThreadPoolThread;
  144 
  145    void RegisterClient(IThreadPoolClient * client);
  146    void UnregisterClient(IThreadPoolClient * client);
  147    status_t SendMessageToThreadPool(IThreadPoolClient * client, const MessageRef & msg);
  148    void DispatchPendingMessagesUnsafe();  // _poolLock must be locked when this is called!
  149    void ThreadFinishedProcessingClientMessages(uint32 threadID, IThreadPoolClient * client);
  150    bool DoesClientHaveMessagesOutstandingUnsafe(IThreadPoolClient * client) const;
  151    void MessageReceivedFromThreadPoolAux(IThreadPoolClient * client, const MessageRef & msg, uint32 numLeft) {client->MessageReceivedFromThreadPool(msg, numLeft);}  // just to skirt protected-member issues 
  152 
  153    const uint32 _maxThreadCount;
  154 
  155    Mutex _poolLock;
  156    bool _shuttingDown;
  157 
  158    uint32 _threadIDCounter;
  159    Hashtable<uint32, ThreadPoolThreadRef> _availableThreads;
  160    Hashtable<uint32, ThreadPoolThreadRef> _activeThreads;
  161    Hashtable<IThreadPoolClient *, bool> _registeredClients;  // registered clients -> (true iff a Thread is currently handling them)
  162    Hashtable<IThreadPoolClient *, Queue<MessageRef> > _pendingMessages;  // Messages ready to be sent to a pool Thread
  163    Hashtable<IThreadPoolClient *, Queue<MessageRef> > _deferredMessages; // Messages to be be sent to a pool Thread when the current Thread's Messages are done
  164    Hashtable<IThreadPoolClient *, ConstSocketRef> _waitingForCompletion; // Clients who are blocked in UnregisterClient() waiting for Messages to complete processing
  165 
  166    DECLARE_COUNTED_OBJECT(ThreadPool);
  167 };
  168 
  169 } // end namespace muscle
  170 
  171 #endif