"Fossies" - the Fresh Open Source Software Archive

Member "unrar/threadpool.cpp" (4 May 2022, 5423 Bytes) of package /linux/misc/unrarsrc-6.1.7.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "threadpool.cpp" see the Fossies "Dox" file reference documentation.

    1 #include "rar.hpp"
    2 
    3 #ifdef RAR_SMP
    4 #include "threadmisc.cpp"
    5 
    6 #ifdef _WIN_ALL
    7 int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
    8 #endif
    9 
   10 ThreadPool::ThreadPool(uint MaxThreads)
   11 {
   12   MaxAllowedThreads = MaxThreads;
   13   if (MaxAllowedThreads>MaxPoolThreads)
   14     MaxAllowedThreads=MaxPoolThreads;
   15   if (MaxAllowedThreads==0)
   16     MaxAllowedThreads=1;
   17 
   18   ThreadsCreatedCount=0;
   19 
   20   // If we have more threads than queue size, we'll hang on pool destroying,
   21   // not releasing all waiting threads.
   22   if (MaxAllowedThreads>ASIZE(TaskQueue))
   23     MaxAllowedThreads=ASIZE(TaskQueue);
   24 
   25   Closing=false;
   26 
   27   bool Success = CriticalSectionCreate(&CritSection);
   28 #ifdef _WIN_ALL
   29   QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
   30   NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
   31   Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
   32 #elif defined(_UNIX)
   33   AnyActive = false;
   34   QueuedTasksCnt = 0;
   35   Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
   36           pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
   37           pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
   38           pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
   39 #endif
   40   if (!Success)
   41   {
   42     ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
   43     ErrHandler.Exit(RARX_FATAL);
   44   }
   45 
   46   QueueTop = 0;
   47   QueueBottom = 0;
   48   ActiveThreads = 0;
   49 }
   50 
   51 
   52 ThreadPool::~ThreadPool()
   53 {
   54   WaitDone();
   55   Closing=true;
   56 
   57 #ifdef _WIN_ALL
   58   ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
   59 #elif defined(_UNIX)
   60   // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
   61   // so lock is required. We would occassionally hang without it.
   62   pthread_mutex_lock(&QueuedTasksCntMutex);
   63   QueuedTasksCnt+=ASIZE(TaskQueue);
   64   pthread_mutex_unlock(&QueuedTasksCntMutex);
   65 
   66   pthread_cond_broadcast(&QueuedTasksCntCond);
   67 #endif
   68 
   69   for(uint I=0;I<ThreadsCreatedCount;I++)
   70   {
   71 #ifdef _WIN_ALL
   72     // Waiting until the thread terminates.
   73     CWaitForSingleObject(ThreadHandles[I]);
   74 #endif
   75     // Close the thread handle. In Unix it results in pthread_join call,
   76     // which also waits for thread termination.
   77     ThreadClose(ThreadHandles[I]);
   78   }
   79 
   80   CriticalSectionDelete(&CritSection);
   81 #ifdef _WIN_ALL
   82   CloseHandle(QueuedTasksCnt);
   83   CloseHandle(NoneActive);
   84 #elif defined(_UNIX)
   85   pthread_cond_destroy(&AnyActiveCond);
   86   pthread_mutex_destroy(&AnyActiveMutex);
   87   pthread_cond_destroy(&QueuedTasksCntCond);
   88   pthread_mutex_destroy(&QueuedTasksCntMutex);
   89 #endif
   90 }
   91 
   92 
   93 void ThreadPool::CreateThreads()
   94 {
   95   for(uint I=0;I<MaxAllowedThreads;I++)
   96   {
   97     ThreadHandles[I] = ThreadCreate(PoolThread, this);
   98     ThreadsCreatedCount++;
   99 #ifdef _WIN_ALL
  100     if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
  101       SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
  102 #endif
  103   }
  104 }
  105 
  106 
  107 NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
  108 {
  109   ((ThreadPool*)Param)->PoolThreadLoop();
  110   return 0;
  111 }
  112 
  113 
  114 void ThreadPool::PoolThreadLoop()
  115 {
  116   QueueEntry Task;
  117   while (GetQueuedTask(&Task))
  118   {
  119     Task.Proc(Task.Param);
  120     
  121     CriticalSectionStart(&CritSection); 
  122     if (--ActiveThreads == 0)
  123     {
  124 #ifdef _WIN_ALL
  125       SetEvent(NoneActive);
  126 #elif defined(_UNIX)
  127       pthread_mutex_lock(&AnyActiveMutex);
  128       AnyActive=false;
  129       pthread_cond_signal(&AnyActiveCond);
  130       pthread_mutex_unlock(&AnyActiveMutex);
  131 #endif
  132     }
  133     CriticalSectionEnd(&CritSection); 
  134   }
  135 }
  136 
  137 
  138 bool ThreadPool::GetQueuedTask(QueueEntry *Task)
  139 {
  140 #ifdef _WIN_ALL
  141   CWaitForSingleObject(QueuedTasksCnt);
  142 #elif defined(_UNIX)
  143   pthread_mutex_lock(&QueuedTasksCntMutex);
  144   while (QueuedTasksCnt==0)
  145     cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
  146   QueuedTasksCnt--;
  147   pthread_mutex_unlock(&QueuedTasksCntMutex);
  148 #endif
  149 
  150   if (Closing)
  151     return false;
  152 
  153   CriticalSectionStart(&CritSection); 
  154 
  155   *Task = TaskQueue[QueueBottom];
  156   QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
  157 
  158   CriticalSectionEnd(&CritSection); 
  159 
  160   return true;
  161 }
  162 
  163 
  164 // Add task to queue. We assume that it is always called from main thread,
  165 // it allows to avoid any locks here. We process collected tasks only
  166 // when WaitDone is called.
  167 void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
  168 {
  169   if (ThreadsCreatedCount == 0)
  170     CreateThreads();
  171   
  172   // If queue is full, wait until it is empty.
  173   if (ActiveThreads>=ASIZE(TaskQueue))
  174     WaitDone();
  175 
  176   TaskQueue[QueueTop].Proc = Proc;
  177   TaskQueue[QueueTop].Param = Data;
  178   QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
  179   ActiveThreads++;
  180 }
  181 
  182 
  183 // Start queued tasks and wait until all threads are inactive.
  184 // We assume that it is always called from main thread, when pool threads
  185 // are sleeping yet.
  186 void ThreadPool::WaitDone()
  187 {
  188   if (ActiveThreads==0)
  189     return;
  190 #ifdef _WIN_ALL
  191   ResetEvent(NoneActive);
  192   ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
  193   CWaitForSingleObject(NoneActive);
  194 #elif defined(_UNIX)
  195   AnyActive=true;
  196 
  197   // Threads reset AnyActive before accessing QueuedTasksCnt and even
  198   // preceding WaitDone() call does not guarantee that some slow thread
  199   // is not accessing QueuedTasksCnt now. So lock is necessary.
  200   pthread_mutex_lock(&QueuedTasksCntMutex);
  201   QueuedTasksCnt+=ActiveThreads;
  202   pthread_mutex_unlock(&QueuedTasksCntMutex);
  203 
  204   pthread_cond_broadcast(&QueuedTasksCntCond);
  205 
  206   pthread_mutex_lock(&AnyActiveMutex);
  207   while (AnyActive)
  208     cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
  209   pthread_mutex_unlock(&AnyActiveMutex);
  210 #endif
  211 }
  212 #endif // RAR_SMP