"Fossies" - the Fresh Open Source Software Archive

Member "fuse-3.10.4/doc/html/fuse-3_810_81_2lib_2fuse__loop__mt_8c_source.html" (9 Jun 2021, 54026 Bytes) of package /linux/misc/fuse-3.10.4.tar.xz:


Caution: In this restricted "Fossies" environment the current HTML page may not be correctly presentated and may have some non-functional links. You can here alternatively try to browse the pure source code or just view or download the uninterpreted raw source code. If the rendering is insufficient you may try to find and view the page on the project site itself.

libfuse
fuse_loop_mt.c
1 /*
2  FUSE: Filesystem in Userspace
3  Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
4 
5  Implementation of the multi-threaded FUSE session loop.
6 
7  This program can be distributed under the terms of the GNU LGPLv2.
8  See the file COPYING.LIB.
9 */
10 
11 #include "config.h"
12 #include "fuse_lowlevel.h"
13 #include "fuse_misc.h"
14 #include "fuse_kernel.h"
15 #include "fuse_i.h"
16 
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <unistd.h>
21 #include <signal.h>
22 #include <semaphore.h>
23 #include <errno.h>
24 #include <sys/time.h>
25 #include <sys/ioctl.h>
26 #include <assert.h>
27 
28 /* Environment var controlling the thread stack size */
29 #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
30 
31 struct fuse_worker {
32  struct fuse_worker *prev;
33  struct fuse_worker *next;
34  pthread_t thread_id;
35  size_t bufsize;
36 
37  // We need to include fuse_buf so that we can properly free
38  // it when a thread is terminated by pthread_cancel().
39  struct fuse_buf fbuf;
40  struct fuse_chan *ch;
41  struct fuse_mt *mt;
42 };
43 
44 struct fuse_mt {
45  pthread_mutex_t lock;
46  int numworker;
47  int numavail;
48  struct fuse_session *se;
49  struct fuse_worker main;
50  sem_t finish;
51  int exit;
52  int error;
53  int clone_fd;
54  int max_idle;
55 };
56 
57 static struct fuse_chan *fuse_chan_new(int fd)
58 {
59  struct fuse_chan *ch = (struct fuse_chan *) malloc(sizeof(*ch));
60  if (ch == NULL) {
61  fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate channel\n");
62  return NULL;
63  }
64 
65  memset(ch, 0, sizeof(*ch));
66  ch->fd = fd;
67  ch->ctr = 1;
68  pthread_mutex_init(&ch->lock, NULL);
69 
70  return ch;
71 }
72 
73 struct fuse_chan *fuse_chan_get(struct fuse_chan *ch)
74 {
75  assert(ch->ctr > 0);
76  pthread_mutex_lock(&ch->lock);
77  ch->ctr++;
78  pthread_mutex_unlock(&ch->lock);
79 
80  return ch;
81 }
82 
83 void fuse_chan_put(struct fuse_chan *ch)
84 {
85  if (ch == NULL)
86  return;
87  pthread_mutex_lock(&ch->lock);
88  ch->ctr--;
89  if (!ch->ctr) {
90  pthread_mutex_unlock(&ch->lock);
91  close(ch->fd);
92  pthread_mutex_destroy(&ch->lock);
93  free(ch);
94  } else
95  pthread_mutex_unlock(&ch->lock);
96 }
97 
98 static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next)
99 {
100  struct fuse_worker *prev = next->prev;
101  w->next = next;
102  w->prev = prev;
103  prev->next = w;
104  next->prev = w;
105 }
106 
107 static void list_del_worker(struct fuse_worker *w)
108 {
109  struct fuse_worker *prev = w->prev;
110  struct fuse_worker *next = w->next;
111  prev->next = next;
112  next->prev = prev;
113 }
114 
115 static int fuse_loop_start_thread(struct fuse_mt *mt);
116 
117 static void *fuse_do_work(void *data)
118 {
119  struct fuse_worker *w = (struct fuse_worker *) data;
120  struct fuse_mt *mt = w->mt;
121 
122  while (!fuse_session_exited(mt->se)) {
123  int isforget = 0;
124  int res;
125 
126  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
127  res = fuse_session_receive_buf_int(mt->se, &w->fbuf, w->ch);
128  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
129  if (res == -EINTR)
130  continue;
131  if (res <= 0) {
132  if (res < 0) {
133  fuse_session_exit(mt->se);
134  mt->error = res;
135  }
136  break;
137  }
138 
139  pthread_mutex_lock(&mt->lock);
140  if (mt->exit) {
141  pthread_mutex_unlock(&mt->lock);
142  return NULL;
143  }
144 
145  /*
146  * This disgusting hack is needed so that zillions of threads
147  * are not created on a burst of FORGET messages
148  */
149  if (!(w->fbuf.flags & FUSE_BUF_IS_FD)) {
150  struct fuse_in_header *in = w->fbuf.mem;
151 
152  if (in->opcode == FUSE_FORGET ||
153  in->opcode == FUSE_BATCH_FORGET)
154  isforget = 1;
155  }
156 
157  if (!isforget)
158  mt->numavail--;
159  if (mt->numavail == 0)
160  fuse_loop_start_thread(mt);
161  pthread_mutex_unlock(&mt->lock);
162 
163  fuse_session_process_buf_int(mt->se, &w->fbuf, w->ch);
164 
165  pthread_mutex_lock(&mt->lock);
166  if (!isforget)
167  mt->numavail++;
168  if (mt->numavail > mt->max_idle) {
169  if (mt->exit) {
170  pthread_mutex_unlock(&mt->lock);
171  return NULL;
172  }
173  list_del_worker(w);
174  mt->numavail--;
175  mt->numworker--;
176  pthread_mutex_unlock(&mt->lock);
177 
178  pthread_detach(w->thread_id);
179  free(w->fbuf.mem);
180  fuse_chan_put(w->ch);
181  free(w);
182  return NULL;
183  }
184  pthread_mutex_unlock(&mt->lock);
185  }
186 
187  sem_post(&mt->finish);
188 
189  return NULL;
190 }
191 
192 int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
193 {
194  sigset_t oldset;
195  sigset_t newset;
196  int res;
197  pthread_attr_t attr;
198  char *stack_size;
199 
200  /* Override default stack size */
201  pthread_attr_init(&attr);
202  stack_size = getenv(ENVNAME_THREAD_STACK);
203  if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
204  fuse_log(FUSE_LOG_ERR, "fuse: invalid stack size: %s\n", stack_size);
205 
206  /* Disallow signal reception in worker threads */
207  sigemptyset(&newset);
208  sigaddset(&newset, SIGTERM);
209  sigaddset(&newset, SIGINT);
210  sigaddset(&newset, SIGHUP);
211  sigaddset(&newset, SIGQUIT);
212  pthread_sigmask(SIG_BLOCK, &newset, &oldset);
213  res = pthread_create(thread_id, &attr, func, arg);
214  pthread_sigmask(SIG_SETMASK, &oldset, NULL);
215  pthread_attr_destroy(&attr);
216  if (res != 0) {
217  fuse_log(FUSE_LOG_ERR, "fuse: error creating thread: %s\n",
218  strerror(res));
219  return -1;
220  }
221 
222  return 0;
223 }
224 
225 static struct fuse_chan *fuse_clone_chan(struct fuse_mt *mt)
226 {
227  int res;
228  int clonefd;
229  uint32_t masterfd;
230  struct fuse_chan *newch;
231  const char *devname = "/dev/fuse";
232 
233 #ifndef O_CLOEXEC
234 #define O_CLOEXEC 0
235 #endif
236  clonefd = open(devname, O_RDWR | O_CLOEXEC);
237  if (clonefd == -1) {
238  fuse_log(FUSE_LOG_ERR, "fuse: failed to open %s: %s\n", devname,
239  strerror(errno));
240  return NULL;
241  }
242  fcntl(clonefd, F_SETFD, FD_CLOEXEC);
243 
244  masterfd = mt->se->fd;
245  res = ioctl(clonefd, FUSE_DEV_IOC_CLONE, &masterfd);
246  if (res == -1) {
247  fuse_log(FUSE_LOG_ERR, "fuse: failed to clone device fd: %s\n",
248  strerror(errno));
249  close(clonefd);
250  return NULL;
251  }
252  newch = fuse_chan_new(clonefd);
253  if (newch == NULL)
254  close(clonefd);
255 
256  return newch;
257 }
258 
259 static int fuse_loop_start_thread(struct fuse_mt *mt)
260 {
261  int res;
262 
263  struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
264  if (!w) {
265  fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate worker structure\n");
266  return -1;
267  }
268  memset(w, 0, sizeof(struct fuse_worker));
269  w->fbuf.mem = NULL;
270  w->mt = mt;
271 
272  w->ch = NULL;
273  if (mt->clone_fd) {
274  w->ch = fuse_clone_chan(mt);
275  if(!w->ch) {
276  /* Don't attempt this again */
277  fuse_log(FUSE_LOG_ERR, "fuse: trying to continue "
278  "without -o clone_fd.\n");
279  mt->clone_fd = 0;
280  }
281  }
282 
283  res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
284  if (res == -1) {
285  fuse_chan_put(w->ch);
286  free(w);
287  return -1;
288  }
289  list_add_worker(w, &mt->main);
290  mt->numavail ++;
291  mt->numworker ++;
292 
293  return 0;
294 }
295 
296 static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
297 {
298  pthread_join(w->thread_id, NULL);
299  pthread_mutex_lock(&mt->lock);
300  list_del_worker(w);
301  pthread_mutex_unlock(&mt->lock);
302  free(w->fbuf.mem);
303  fuse_chan_put(w->ch);
304  free(w);
305 }
306 
307 FUSE_SYMVER("fuse_session_loop_mt_32", "fuse_session_loop_mt@@FUSE_3.2")
308 int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config *config)
309 {
310  int err;
311  struct fuse_mt mt;
312  struct fuse_worker *w;
313 
314  memset(&mt, 0, sizeof(struct fuse_mt));
315  mt.se = se;
316  mt.clone_fd = config->clone_fd;
317  mt.error = 0;
318  mt.numworker = 0;
319  mt.numavail = 0;
320  mt.max_idle = config->max_idle_threads;
321  mt.main.thread_id = pthread_self();
322  mt.main.prev = mt.main.next = &mt.main;
323  sem_init(&mt.finish, 0, 0);
324  pthread_mutex_init(&mt.lock, NULL);
325 
326  pthread_mutex_lock(&mt.lock);
327  err = fuse_loop_start_thread(&mt);
328  pthread_mutex_unlock(&mt.lock);
329  if (!err) {
330  /* sem_wait() is interruptible */
331  while (!fuse_session_exited(se))
332  sem_wait(&mt.finish);
333 
334  pthread_mutex_lock(&mt.lock);
335  for (w = mt.main.next; w != &mt.main; w = w->next)
336  pthread_cancel(w->thread_id);
337  mt.exit = 1;
338  pthread_mutex_unlock(&mt.lock);
339 
340  while (mt.main.next != &mt.main)
341  fuse_join_worker(&mt, mt.main.next);
342 
343  err = mt.error;
344  }
345 
346  pthread_mutex_destroy(&mt.lock);
347  sem_destroy(&mt.finish);
348  if(se->error != 0)
349  err = se->error;
350  fuse_session_reset(se);
351  return err;
352 }
353 
354 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd);
355 FUSE_SYMVER("fuse_session_loop_mt_31", "fuse_session_loop_mt@FUSE_3.0")
356 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd)
357 {
358  struct fuse_loop_config config;
359  config.clone_fd = clone_fd;
360  config.max_idle_threads = 10;
361  return fuse_session_loop_mt_32(se, &config);
362 }
void fuse_session_exit(struct fuse_session *se)
int fuse_session_exited(struct fuse_session *se)
void fuse_session_reset(struct fuse_session *se)
void fuse_log(enum fuse_log_level level, const char *fmt,...)
Definition: fuse_log.c:33