"Fossies" - the Fresh Open Source Software Archive 
Member "unrar/threadpool.cpp" (4 May 2022, 5423 Bytes) of package /linux/misc/unrarsrc-6.1.7.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "threadpool.cpp" see the
Fossies "Dox" file reference documentation.
1 #include "rar.hpp"
2
3 #ifdef RAR_SMP
4 #include "threadmisc.cpp"
5
6 #ifdef _WIN_ALL
7 int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
8 #endif
9
10 ThreadPool::ThreadPool(uint MaxThreads)
11 {
12 MaxAllowedThreads = MaxThreads;
13 if (MaxAllowedThreads>MaxPoolThreads)
14 MaxAllowedThreads=MaxPoolThreads;
15 if (MaxAllowedThreads==0)
16 MaxAllowedThreads=1;
17
18 ThreadsCreatedCount=0;
19
20 // If we have more threads than queue size, we'll hang on pool destroying,
21 // not releasing all waiting threads.
22 if (MaxAllowedThreads>ASIZE(TaskQueue))
23 MaxAllowedThreads=ASIZE(TaskQueue);
24
25 Closing=false;
26
27 bool Success = CriticalSectionCreate(&CritSection);
28 #ifdef _WIN_ALL
29 QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
30 NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
31 Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
32 #elif defined(_UNIX)
33 AnyActive = false;
34 QueuedTasksCnt = 0;
35 Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
36 pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
37 pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
38 pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
39 #endif
40 if (!Success)
41 {
42 ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
43 ErrHandler.Exit(RARX_FATAL);
44 }
45
46 QueueTop = 0;
47 QueueBottom = 0;
48 ActiveThreads = 0;
49 }
50
51
52 ThreadPool::~ThreadPool()
53 {
54 WaitDone();
55 Closing=true;
56
57 #ifdef _WIN_ALL
58 ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
59 #elif defined(_UNIX)
60 // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
61 // so lock is required. We would occassionally hang without it.
62 pthread_mutex_lock(&QueuedTasksCntMutex);
63 QueuedTasksCnt+=ASIZE(TaskQueue);
64 pthread_mutex_unlock(&QueuedTasksCntMutex);
65
66 pthread_cond_broadcast(&QueuedTasksCntCond);
67 #endif
68
69 for(uint I=0;I<ThreadsCreatedCount;I++)
70 {
71 #ifdef _WIN_ALL
72 // Waiting until the thread terminates.
73 CWaitForSingleObject(ThreadHandles[I]);
74 #endif
75 // Close the thread handle. In Unix it results in pthread_join call,
76 // which also waits for thread termination.
77 ThreadClose(ThreadHandles[I]);
78 }
79
80 CriticalSectionDelete(&CritSection);
81 #ifdef _WIN_ALL
82 CloseHandle(QueuedTasksCnt);
83 CloseHandle(NoneActive);
84 #elif defined(_UNIX)
85 pthread_cond_destroy(&AnyActiveCond);
86 pthread_mutex_destroy(&AnyActiveMutex);
87 pthread_cond_destroy(&QueuedTasksCntCond);
88 pthread_mutex_destroy(&QueuedTasksCntMutex);
89 #endif
90 }
91
92
93 void ThreadPool::CreateThreads()
94 {
95 for(uint I=0;I<MaxAllowedThreads;I++)
96 {
97 ThreadHandles[I] = ThreadCreate(PoolThread, this);
98 ThreadsCreatedCount++;
99 #ifdef _WIN_ALL
100 if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
101 SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
102 #endif
103 }
104 }
105
106
107 NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
108 {
109 ((ThreadPool*)Param)->PoolThreadLoop();
110 return 0;
111 }
112
113
114 void ThreadPool::PoolThreadLoop()
115 {
116 QueueEntry Task;
117 while (GetQueuedTask(&Task))
118 {
119 Task.Proc(Task.Param);
120
121 CriticalSectionStart(&CritSection);
122 if (--ActiveThreads == 0)
123 {
124 #ifdef _WIN_ALL
125 SetEvent(NoneActive);
126 #elif defined(_UNIX)
127 pthread_mutex_lock(&AnyActiveMutex);
128 AnyActive=false;
129 pthread_cond_signal(&AnyActiveCond);
130 pthread_mutex_unlock(&AnyActiveMutex);
131 #endif
132 }
133 CriticalSectionEnd(&CritSection);
134 }
135 }
136
137
138 bool ThreadPool::GetQueuedTask(QueueEntry *Task)
139 {
140 #ifdef _WIN_ALL
141 CWaitForSingleObject(QueuedTasksCnt);
142 #elif defined(_UNIX)
143 pthread_mutex_lock(&QueuedTasksCntMutex);
144 while (QueuedTasksCnt==0)
145 cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
146 QueuedTasksCnt--;
147 pthread_mutex_unlock(&QueuedTasksCntMutex);
148 #endif
149
150 if (Closing)
151 return false;
152
153 CriticalSectionStart(&CritSection);
154
155 *Task = TaskQueue[QueueBottom];
156 QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
157
158 CriticalSectionEnd(&CritSection);
159
160 return true;
161 }
162
163
164 // Add task to queue. We assume that it is always called from main thread,
165 // it allows to avoid any locks here. We process collected tasks only
166 // when WaitDone is called.
167 void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
168 {
169 if (ThreadsCreatedCount == 0)
170 CreateThreads();
171
172 // If queue is full, wait until it is empty.
173 if (ActiveThreads>=ASIZE(TaskQueue))
174 WaitDone();
175
176 TaskQueue[QueueTop].Proc = Proc;
177 TaskQueue[QueueTop].Param = Data;
178 QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
179 ActiveThreads++;
180 }
181
182
183 // Start queued tasks and wait until all threads are inactive.
184 // We assume that it is always called from main thread, when pool threads
185 // are sleeping yet.
186 void ThreadPool::WaitDone()
187 {
188 if (ActiveThreads==0)
189 return;
190 #ifdef _WIN_ALL
191 ResetEvent(NoneActive);
192 ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
193 CWaitForSingleObject(NoneActive);
194 #elif defined(_UNIX)
195 AnyActive=true;
196
197 // Threads reset AnyActive before accessing QueuedTasksCnt and even
198 // preceding WaitDone() call does not guarantee that some slow thread
199 // is not accessing QueuedTasksCnt now. So lock is necessary.
200 pthread_mutex_lock(&QueuedTasksCntMutex);
201 QueuedTasksCnt+=ActiveThreads;
202 pthread_mutex_unlock(&QueuedTasksCntMutex);
203
204 pthread_cond_broadcast(&QueuedTasksCntCond);
205
206 pthread_mutex_lock(&AnyActiveMutex);
207 while (AnyActive)
208 cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
209 pthread_mutex_unlock(&AnyActiveMutex);
210 #endif
211 }
212 #endif // RAR_SMP