"Fossies" - the Fresh Open Source Software Archive

Member "fuse-3.10.4/doc/html/fuse__loop__mt_8c_source.html" (9 Jun 2021, 53307 Bytes) of package /linux/misc/fuse-3.10.4.tar.xz:


Caution: In this restricted "Fossies" environment the current HTML page may not be correctly presentated and may have some non-functional links. You can here alternatively try to browse the pure source code or just view or download the uninterpreted raw source code. If the rendering is insufficient you may try to find and view the page on the project site itself.

libfuse
fuse_loop_mt.c
1 /*
2  FUSE: Filesystem in Userspace
3  Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
4 
5  Implementation of the multi-threaded FUSE session loop.
6 
7  This program can be distributed under the terms of the GNU LGPLv2.
8  See the file COPYING.LIB.
9 */
10 
11 #include "config.h"
12 #include "fuse_lowlevel.h"
13 #include "fuse_misc.h"
14 #include "fuse_kernel.h"
15 #include "fuse_i.h"
16 
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <unistd.h>
21 #include <signal.h>
22 #include <semaphore.h>
23 #include <errno.h>
24 #include <sys/time.h>
25 #include <sys/ioctl.h>
26 #include <assert.h>
27 
28 /* Environment var controlling the thread stack size */
29 #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
30 
31 struct fuse_worker {
32  struct fuse_worker *prev;
33  struct fuse_worker *next;
34  pthread_t thread_id;
35 
36  // We need to include fuse_buf so that we can properly free
37  // it when a thread is terminated by pthread_cancel().
38  struct fuse_buf fbuf;
39  struct fuse_chan *ch;
40  struct fuse_mt *mt;
41 };
42 
43 struct fuse_mt {
44  pthread_mutex_t lock;
45  int numworker;
46  int numavail;
47  struct fuse_session *se;
48  struct fuse_worker main;
49  sem_t finish;
50  int exit;
51  int error;
52  int clone_fd;
53  int max_idle;
54 };
55 
56 static struct fuse_chan *fuse_chan_new(int fd)
57 {
58  struct fuse_chan *ch = (struct fuse_chan *) malloc(sizeof(*ch));
59  if (ch == NULL) {
60  fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate channel\n");
61  return NULL;
62  }
63 
64  memset(ch, 0, sizeof(*ch));
65  ch->fd = fd;
66  ch->ctr = 1;
67  pthread_mutex_init(&ch->lock, NULL);
68 
69  return ch;
70 }
71 
72 struct fuse_chan *fuse_chan_get(struct fuse_chan *ch)
73 {
74  assert(ch->ctr > 0);
75  pthread_mutex_lock(&ch->lock);
76  ch->ctr++;
77  pthread_mutex_unlock(&ch->lock);
78 
79  return ch;
80 }
81 
82 void fuse_chan_put(struct fuse_chan *ch)
83 {
84  if (ch == NULL)
85  return;
86  pthread_mutex_lock(&ch->lock);
87  ch->ctr--;
88  if (!ch->ctr) {
89  pthread_mutex_unlock(&ch->lock);
90  close(ch->fd);
91  pthread_mutex_destroy(&ch->lock);
92  free(ch);
93  } else
94  pthread_mutex_unlock(&ch->lock);
95 }
96 
97 static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next)
98 {
99  struct fuse_worker *prev = next->prev;
100  w->next = next;
101  w->prev = prev;
102  prev->next = w;
103  next->prev = w;
104 }
105 
106 static void list_del_worker(struct fuse_worker *w)
107 {
108  struct fuse_worker *prev = w->prev;
109  struct fuse_worker *next = w->next;
110  prev->next = next;
111  next->prev = prev;
112 }
113 
114 static int fuse_loop_start_thread(struct fuse_mt *mt);
115 
116 static void *fuse_do_work(void *data)
117 {
118  struct fuse_worker *w = (struct fuse_worker *) data;
119  struct fuse_mt *mt = w->mt;
120 
121  while (!fuse_session_exited(mt->se)) {
122  int isforget = 0;
123  int res;
124 
125  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
126  res = fuse_session_receive_buf_int(mt->se, &w->fbuf, w->ch);
127  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
128  if (res == -EINTR)
129  continue;
130  if (res <= 0) {
131  if (res < 0) {
132  fuse_session_exit(mt->se);
133  mt->error = res;
134  }
135  break;
136  }
137 
138  pthread_mutex_lock(&mt->lock);
139  if (mt->exit) {
140  pthread_mutex_unlock(&mt->lock);
141  return NULL;
142  }
143 
144  /*
145  * This disgusting hack is needed so that zillions of threads
146  * are not created on a burst of FORGET messages
147  */
148  if (!(w->fbuf.flags & FUSE_BUF_IS_FD)) {
149  struct fuse_in_header *in = w->fbuf.mem;
150 
151  if (in->opcode == FUSE_FORGET ||
152  in->opcode == FUSE_BATCH_FORGET)
153  isforget = 1;
154  }
155 
156  if (!isforget)
157  mt->numavail--;
158  if (mt->numavail == 0)
159  fuse_loop_start_thread(mt);
160  pthread_mutex_unlock(&mt->lock);
161 
162  fuse_session_process_buf_int(mt->se, &w->fbuf, w->ch);
163 
164  pthread_mutex_lock(&mt->lock);
165  if (!isforget)
166  mt->numavail++;
167  if (mt->numavail > mt->max_idle) {
168  if (mt->exit) {
169  pthread_mutex_unlock(&mt->lock);
170  return NULL;
171  }
172  list_del_worker(w);
173  mt->numavail--;
174  mt->numworker--;
175  pthread_mutex_unlock(&mt->lock);
176 
177  pthread_detach(w->thread_id);
178  free(w->fbuf.mem);
179  fuse_chan_put(w->ch);
180  free(w);
181  return NULL;
182  }
183  pthread_mutex_unlock(&mt->lock);
184  }
185 
186  sem_post(&mt->finish);
187 
188  return NULL;
189 }
190 
191 int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
192 {
193  sigset_t oldset;
194  sigset_t newset;
195  int res;
196  pthread_attr_t attr;
197  char *stack_size;
198 
199  /* Override default stack size */
200  pthread_attr_init(&attr);
201  stack_size = getenv(ENVNAME_THREAD_STACK);
202  if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
203  fuse_log(FUSE_LOG_ERR, "fuse: invalid stack size: %s\n", stack_size);
204 
205  /* Disallow signal reception in worker threads */
206  sigemptyset(&newset);
207  sigaddset(&newset, SIGTERM);
208  sigaddset(&newset, SIGINT);
209  sigaddset(&newset, SIGHUP);
210  sigaddset(&newset, SIGQUIT);
211  pthread_sigmask(SIG_BLOCK, &newset, &oldset);
212  res = pthread_create(thread_id, &attr, func, arg);
213  pthread_sigmask(SIG_SETMASK, &oldset, NULL);
214  pthread_attr_destroy(&attr);
215  if (res != 0) {
216  fuse_log(FUSE_LOG_ERR, "fuse: error creating thread: %s\n",
217  strerror(res));
218  return -1;
219  }
220 
221  return 0;
222 }
223 
224 static struct fuse_chan *fuse_clone_chan(struct fuse_mt *mt)
225 {
226  int res;
227  int clonefd;
228  uint32_t masterfd;
229  struct fuse_chan *newch;
230  const char *devname = "/dev/fuse";
231 
232 #ifndef O_CLOEXEC
233 #define O_CLOEXEC 0
234 #endif
235  clonefd = open(devname, O_RDWR | O_CLOEXEC);
236  if (clonefd == -1) {
237  fuse_log(FUSE_LOG_ERR, "fuse: failed to open %s: %s\n", devname,
238  strerror(errno));
239  return NULL;
240  }
241  fcntl(clonefd, F_SETFD, FD_CLOEXEC);
242 
243  masterfd = mt->se->fd;
244  res = ioctl(clonefd, FUSE_DEV_IOC_CLONE, &masterfd);
245  if (res == -1) {
246  fuse_log(FUSE_LOG_ERR, "fuse: failed to clone device fd: %s\n",
247  strerror(errno));
248  close(clonefd);
249  return NULL;
250  }
251  newch = fuse_chan_new(clonefd);
252  if (newch == NULL)
253  close(clonefd);
254 
255  return newch;
256 }
257 
258 static int fuse_loop_start_thread(struct fuse_mt *mt)
259 {
260  int res;
261 
262  struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
263  if (!w) {
264  fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate worker structure\n");
265  return -1;
266  }
267  memset(w, 0, sizeof(struct fuse_worker));
268  w->fbuf.mem = NULL;
269  w->mt = mt;
270 
271  w->ch = NULL;
272  if (mt->clone_fd) {
273  w->ch = fuse_clone_chan(mt);
274  if(!w->ch) {
275  /* Don't attempt this again */
276  fuse_log(FUSE_LOG_ERR, "fuse: trying to continue "
277  "without -o clone_fd.\n");
278  mt->clone_fd = 0;
279  }
280  }
281 
282  res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
283  if (res == -1) {
284  fuse_chan_put(w->ch);
285  free(w);
286  return -1;
287  }
288  list_add_worker(w, &mt->main);
289  mt->numavail ++;
290  mt->numworker ++;
291 
292  return 0;
293 }
294 
295 static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
296 {
297  pthread_join(w->thread_id, NULL);
298  pthread_mutex_lock(&mt->lock);
299  list_del_worker(w);
300  pthread_mutex_unlock(&mt->lock);
301  free(w->fbuf.mem);
302  fuse_chan_put(w->ch);
303  free(w);
304 }
305 
306 FUSE_SYMVER("fuse_session_loop_mt_32", "fuse_session_loop_mt@@FUSE_3.2")
307 int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config *config)
308 {
309  int err;
310  struct fuse_mt mt;
311  struct fuse_worker *w;
312 
313  memset(&mt, 0, sizeof(struct fuse_mt));
314  mt.se = se;
315  mt.clone_fd = config->clone_fd;
316  mt.error = 0;
317  mt.numworker = 0;
318  mt.numavail = 0;
319  mt.max_idle = config->max_idle_threads;
320  mt.main.thread_id = pthread_self();
321  mt.main.prev = mt.main.next = &mt.main;
322  sem_init(&mt.finish, 0, 0);
323  pthread_mutex_init(&mt.lock, NULL);
324 
325  pthread_mutex_lock(&mt.lock);
326  err = fuse_loop_start_thread(&mt);
327  pthread_mutex_unlock(&mt.lock);
328  if (!err) {
329  /* sem_wait() is interruptible */
330  while (!fuse_session_exited(se))
331  sem_wait(&mt.finish);
332 
333  pthread_mutex_lock(&mt.lock);
334  for (w = mt.main.next; w != &mt.main; w = w->next)
335  pthread_cancel(w->thread_id);
336  mt.exit = 1;
337  pthread_mutex_unlock(&mt.lock);
338 
339  while (mt.main.next != &mt.main)
340  fuse_join_worker(&mt, mt.main.next);
341 
342  err = mt.error;
343  }
344 
345  pthread_mutex_destroy(&mt.lock);
346  sem_destroy(&mt.finish);
347  if(se->error != 0)
348  err = se->error;
349  fuse_session_reset(se);
350  return err;
351 }
352 
353 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd);
354 FUSE_SYMVER("fuse_session_loop_mt_31", "fuse_session_loop_mt@FUSE_3.0")
355 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd)
356 {
357  struct fuse_loop_config config;
358  config.clone_fd = clone_fd;
359  config.max_idle_threads = 10;
360  return fuse_session_loop_mt_32(se, &config);
361 }
void fuse_session_exit(struct fuse_session *se)
int fuse_session_exited(struct fuse_session *se)
void fuse_session_reset(struct fuse_session *se)
void fuse_log(enum fuse_log_level level, const char *fmt,...)
Definition: fuse_log.c:33