unrarsrc  6.1.7
About: unrar extracts, views and tests the contents of archives created with the RAR archiver.
  Fossies Dox: unrarsrc-6.1.7.tar.gz  ("unofficial" and yet experimental doxygen-generated source code documentation)  

threadpool.cpp
Go to the documentation of this file.
1#include "rar.hpp"
2
3#ifdef RAR_SMP
4#include "threadmisc.cpp"
5
6#ifdef _WIN_ALL
7int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
8#endif
9
10ThreadPool::ThreadPool(uint MaxThreads)
11{
12 MaxAllowedThreads = MaxThreads;
13 if (MaxAllowedThreads>MaxPoolThreads)
14 MaxAllowedThreads=MaxPoolThreads;
15 if (MaxAllowedThreads==0)
16 MaxAllowedThreads=1;
17
18 ThreadsCreatedCount=0;
19
20 // If we have more threads than queue size, we'll hang on pool destroying,
21 // not releasing all waiting threads.
22 if (MaxAllowedThreads>ASIZE(TaskQueue))
23 MaxAllowedThreads=ASIZE(TaskQueue);
24
25 Closing=false;
26
27 bool Success = CriticalSectionCreate(&CritSection);
28#ifdef _WIN_ALL
29 QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
30 NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
31 Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
32#elif defined(_UNIX)
33 AnyActive = false;
34 QueuedTasksCnt = 0;
35 Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
36 pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
37 pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
38 pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
39#endif
40 if (!Success)
41 {
42 ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
44 }
45
46 QueueTop = 0;
47 QueueBottom = 0;
48 ActiveThreads = 0;
49}
50
51
52ThreadPool::~ThreadPool()
53{
54 WaitDone();
55 Closing=true;
56
57#ifdef _WIN_ALL
58 ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
59#elif defined(_UNIX)
60 // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
61 // so lock is required. We would occassionally hang without it.
62 pthread_mutex_lock(&QueuedTasksCntMutex);
63 QueuedTasksCnt+=ASIZE(TaskQueue);
64 pthread_mutex_unlock(&QueuedTasksCntMutex);
65
66 pthread_cond_broadcast(&QueuedTasksCntCond);
67#endif
68
69 for(uint I=0;I<ThreadsCreatedCount;I++)
70 {
71#ifdef _WIN_ALL
72 // Waiting until the thread terminates.
73 CWaitForSingleObject(ThreadHandles[I]);
74#endif
75 // Close the thread handle. In Unix it results in pthread_join call,
76 // which also waits for thread termination.
77 ThreadClose(ThreadHandles[I]);
78 }
79
80 CriticalSectionDelete(&CritSection);
81#ifdef _WIN_ALL
82 CloseHandle(QueuedTasksCnt);
83 CloseHandle(NoneActive);
84#elif defined(_UNIX)
85 pthread_cond_destroy(&AnyActiveCond);
86 pthread_mutex_destroy(&AnyActiveMutex);
87 pthread_cond_destroy(&QueuedTasksCntCond);
88 pthread_mutex_destroy(&QueuedTasksCntMutex);
89#endif
90}
91
92
93void ThreadPool::CreateThreads()
94{
95 for(uint I=0;I<MaxAllowedThreads;I++)
96 {
97 ThreadHandles[I] = ThreadCreate(PoolThread, this);
98 ThreadsCreatedCount++;
99#ifdef _WIN_ALL
100 if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
101 SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
102#endif
103 }
104}
105
106
107NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
108{
109 ((ThreadPool*)Param)->PoolThreadLoop();
110 return 0;
111}
112
113
114void ThreadPool::PoolThreadLoop()
115{
116 QueueEntry Task;
117 while (GetQueuedTask(&Task))
118 {
119 Task.Proc(Task.Param);
120
121 CriticalSectionStart(&CritSection);
122 if (--ActiveThreads == 0)
123 {
124#ifdef _WIN_ALL
125 SetEvent(NoneActive);
126#elif defined(_UNIX)
127 pthread_mutex_lock(&AnyActiveMutex);
128 AnyActive=false;
129 pthread_cond_signal(&AnyActiveCond);
130 pthread_mutex_unlock(&AnyActiveMutex);
131#endif
132 }
133 CriticalSectionEnd(&CritSection);
134 }
135}
136
137
138bool ThreadPool::GetQueuedTask(QueueEntry *Task)
139{
140#ifdef _WIN_ALL
141 CWaitForSingleObject(QueuedTasksCnt);
142#elif defined(_UNIX)
143 pthread_mutex_lock(&QueuedTasksCntMutex);
144 while (QueuedTasksCnt==0)
145 cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
146 QueuedTasksCnt--;
147 pthread_mutex_unlock(&QueuedTasksCntMutex);
148#endif
149
150 if (Closing)
151 return false;
152
153 CriticalSectionStart(&CritSection);
154
155 *Task = TaskQueue[QueueBottom];
156 QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
157
158 CriticalSectionEnd(&CritSection);
159
160 return true;
161}
162
163
164// Add task to queue. We assume that it is always called from main thread,
165// it allows to avoid any locks here. We process collected tasks only
166// when WaitDone is called.
167void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
168{
169 if (ThreadsCreatedCount == 0)
170 CreateThreads();
171
172 // If queue is full, wait until it is empty.
173 if (ActiveThreads>=ASIZE(TaskQueue))
174 WaitDone();
175
176 TaskQueue[QueueTop].Proc = Proc;
177 TaskQueue[QueueTop].Param = Data;
178 QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
179 ActiveThreads++;
180}
181
182
183// Start queued tasks and wait until all threads are inactive.
184// We assume that it is always called from main thread, when pool threads
185// are sleeping yet.
186void ThreadPool::WaitDone()
187{
188 if (ActiveThreads==0)
189 return;
190#ifdef _WIN_ALL
191 ResetEvent(NoneActive);
192 ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
193 CWaitForSingleObject(NoneActive);
194#elif defined(_UNIX)
195 AnyActive=true;
196
197 // Threads reset AnyActive before accessing QueuedTasksCnt and even
198 // preceding WaitDone() call does not guarantee that some slow thread
199 // is not accessing QueuedTasksCnt now. So lock is necessary.
200 pthread_mutex_lock(&QueuedTasksCntMutex);
201 QueuedTasksCnt+=ActiveThreads;
202 pthread_mutex_unlock(&QueuedTasksCntMutex);
203
204 pthread_cond_broadcast(&QueuedTasksCntCond);
205
206 pthread_mutex_lock(&AnyActiveMutex);
207 while (AnyActive)
208 cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
209 pthread_mutex_unlock(&AnyActiveMutex);
210#endif
211}
212#endif // RAR_SMP
ErrorHandler ErrHandler
void GeneralErrMsg(const wchar *fmt,...)
Definition: errhnd.cpp:145
void Exit(RAR_EXIT ExitCode)
Definition: errhnd.cpp:236
@ RARX_FATAL
Definition: errhnd.hpp:8
#define TRUE
Definition: os.hpp:5
#define ASIZE(x)
Definition: rardefs.hpp:10
unsigned int uint
Definition: rartypes.hpp:8
static void CriticalSectionStart(CRITSECT_HANDLE *CritSection)
Definition: threadmisc.cpp:22
static void CriticalSectionEnd(CRITSECT_HANDLE *CritSection)
Definition: threadmisc.cpp:32
static void CriticalSectionDelete(CRITSECT_HANDLE *CritSection)
Definition: threadmisc.cpp:12
static void ThreadClose(THREAD_HANDLE hThread)
Definition: threadmisc.cpp:75
static bool CriticalSectionCreate(CRITSECT_HANDLE *CritSection)
Definition: threadmisc.cpp:1
static THREAD_HANDLE ThreadCreate(NATIVE_THREAD_PTR Proc, void *Data)
Definition: threadmisc.cpp:42
const uint MaxPoolThreads
Definition: threadpool.hpp:5