"Fossies" - the Fresh Open Source Software Archive

Member "fuse-3.2.1/lib/fuse_loop_mt.c" (14 Nov 2017, 7957 Bytes) of package /linux/misc/fuse-3.2.1.tar.xz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "fuse_loop_mt.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 3.2.0_vs_3.2.1.

    1 /*
    2   FUSE: Filesystem in Userspace
    3   Copyright (C) 2001-2007  Miklos Szeredi <miklos@szeredi.hu>
    4 
    5   Implementation of the multi-threaded FUSE session loop.
    6 
    7   This program can be distributed under the terms of the GNU LGPLv2.
    8   See the file COPYING.LIB.
    9 */
   10 
   11 #include "config.h"
   12 #include "fuse_lowlevel.h"
   13 #include "fuse_misc.h"
   14 #include "fuse_kernel.h"
   15 #include "fuse_i.h"
   16 
   17 #include <stdio.h>
   18 #include <stdlib.h>
   19 #include <string.h>
   20 #include <unistd.h>
   21 #include <signal.h>
   22 #include <semaphore.h>
   23 #include <errno.h>
   24 #include <sys/time.h>
   25 #include <sys/ioctl.h>
   26 #include <assert.h>
   27 
   28 /* Environment var controlling the thread stack size */
   29 #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
   30 
   31 struct fuse_worker {
   32     struct fuse_worker *prev;
   33     struct fuse_worker *next;
   34     pthread_t thread_id;
   35     size_t bufsize;
   36     struct fuse_buf fbuf;
   37     struct fuse_chan *ch;
   38     struct fuse_mt *mt;
   39 };
   40 
   41 struct fuse_mt {
   42     pthread_mutex_t lock;
   43     int numworker;
   44     int numavail;
   45     struct fuse_session *se;
   46     struct fuse_worker main;
   47     sem_t finish;
   48     int exit;
   49     int error;
   50     int clone_fd;
   51     int max_idle;
   52 };
   53 
   54 static struct fuse_chan *fuse_chan_new(int fd)
   55 {
   56     struct fuse_chan *ch = (struct fuse_chan *) malloc(sizeof(*ch));
   57     if (ch == NULL) {
   58         fprintf(stderr, "fuse: failed to allocate channel\n");
   59         return NULL;
   60     }
   61 
   62     memset(ch, 0, sizeof(*ch));
   63     ch->fd = fd;
   64     ch->ctr = 1;
   65     fuse_mutex_init(&ch->lock);
   66 
   67     return ch;
   68 }
   69 
   70 struct fuse_chan *fuse_chan_get(struct fuse_chan *ch)
   71 {
   72     assert(ch->ctr > 0);
   73     pthread_mutex_lock(&ch->lock);
   74     ch->ctr++;
   75     pthread_mutex_unlock(&ch->lock);
   76 
   77     return ch;
   78 }
   79 
   80 void fuse_chan_put(struct fuse_chan *ch)
   81 {
   82     if (ch == NULL)
   83         return;
   84     pthread_mutex_lock(&ch->lock);
   85     ch->ctr--;
   86     if (!ch->ctr) {
   87         pthread_mutex_unlock(&ch->lock);
   88         close(ch->fd);
   89         pthread_mutex_destroy(&ch->lock);
   90         free(ch);
   91     } else
   92         pthread_mutex_unlock(&ch->lock);
   93 }
   94 
   95 static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next)
   96 {
   97     struct fuse_worker *prev = next->prev;
   98     w->next = next;
   99     w->prev = prev;
  100     prev->next = w;
  101     next->prev = w;
  102 }
  103 
  104 static void list_del_worker(struct fuse_worker *w)
  105 {
  106     struct fuse_worker *prev = w->prev;
  107     struct fuse_worker *next = w->next;
  108     prev->next = next;
  109     next->prev = prev;
  110 }
  111 
  112 static int fuse_loop_start_thread(struct fuse_mt *mt);
  113 
  114 static void *fuse_do_work(void *data)
  115 {
  116     struct fuse_worker *w = (struct fuse_worker *) data;
  117     struct fuse_mt *mt = w->mt;
  118 
  119     while (!fuse_session_exited(mt->se)) {
  120         int isforget = 0;
  121         int res;
  122 
  123         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  124         res = fuse_session_receive_buf_int(mt->se, &w->fbuf, w->ch);
  125         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
  126         if (res == -EINTR)
  127             continue;
  128         if (res <= 0) {
  129             if (res < 0) {
  130                 fuse_session_exit(mt->se);
  131                 mt->error = res;
  132             }
  133             break;
  134         }
  135 
  136         pthread_mutex_lock(&mt->lock);
  137         if (mt->exit) {
  138             pthread_mutex_unlock(&mt->lock);
  139             return NULL;
  140         }
  141 
  142         /*
  143          * This disgusting hack is needed so that zillions of threads
  144          * are not created on a burst of FORGET messages
  145          */
  146         if (!(w->fbuf.flags & FUSE_BUF_IS_FD)) {
  147             struct fuse_in_header *in = w->fbuf.mem;
  148 
  149             if (in->opcode == FUSE_FORGET ||
  150                 in->opcode == FUSE_BATCH_FORGET)
  151                 isforget = 1;
  152         }
  153 
  154         if (!isforget)
  155             mt->numavail--;
  156         if (mt->numavail == 0)
  157             fuse_loop_start_thread(mt);
  158         pthread_mutex_unlock(&mt->lock);
  159 
  160         fuse_session_process_buf_int(mt->se, &w->fbuf, w->ch);
  161 
  162         pthread_mutex_lock(&mt->lock);
  163         if (!isforget)
  164             mt->numavail++;
  165         if (mt->numavail > mt->max_idle) {
  166             if (mt->exit) {
  167                 pthread_mutex_unlock(&mt->lock);
  168                 return NULL;
  169             }
  170             list_del_worker(w);
  171             mt->numavail--;
  172             mt->numworker--;
  173             pthread_mutex_unlock(&mt->lock);
  174 
  175             pthread_detach(w->thread_id);
  176             free(w->fbuf.mem);
  177             fuse_chan_put(w->ch);
  178             free(w);
  179             return NULL;
  180         }
  181         pthread_mutex_unlock(&mt->lock);
  182     }
  183 
  184     sem_post(&mt->finish);
  185 
  186     return NULL;
  187 }
  188 
  189 int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
  190 {
  191     sigset_t oldset;
  192     sigset_t newset;
  193     int res;
  194     pthread_attr_t attr;
  195     char *stack_size;
  196 
  197     /* Override default stack size */
  198     pthread_attr_init(&attr);
  199     stack_size = getenv(ENVNAME_THREAD_STACK);
  200     if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
  201         fprintf(stderr, "fuse: invalid stack size: %s\n", stack_size);
  202 
  203     /* Disallow signal reception in worker threads */
  204     sigemptyset(&newset);
  205     sigaddset(&newset, SIGTERM);
  206     sigaddset(&newset, SIGINT);
  207     sigaddset(&newset, SIGHUP);
  208     sigaddset(&newset, SIGQUIT);
  209     pthread_sigmask(SIG_BLOCK, &newset, &oldset);
  210     res = pthread_create(thread_id, &attr, func, arg);
  211     pthread_sigmask(SIG_SETMASK, &oldset, NULL);
  212     pthread_attr_destroy(&attr);
  213     if (res != 0) {
  214         fprintf(stderr, "fuse: error creating thread: %s\n",
  215             strerror(res));
  216         return -1;
  217     }
  218 
  219     return 0;
  220 }
  221 
  222 static struct fuse_chan *fuse_clone_chan(struct fuse_mt *mt)
  223 {
  224     int res;
  225     int clonefd;
  226     uint32_t masterfd;
  227     struct fuse_chan *newch;
  228     const char *devname = "/dev/fuse";
  229 
  230 #ifndef O_CLOEXEC
  231 #define O_CLOEXEC 0
  232 #endif
  233     clonefd = open(devname, O_RDWR | O_CLOEXEC);
  234     if (clonefd == -1) {
  235         fprintf(stderr, "fuse: failed to open %s: %s\n", devname,
  236             strerror(errno));
  237         return NULL;
  238     }
  239     fcntl(clonefd, F_SETFD, FD_CLOEXEC);
  240 
  241     masterfd = mt->se->fd;
  242     res = ioctl(clonefd, FUSE_DEV_IOC_CLONE, &masterfd);
  243     if (res == -1) {
  244         fprintf(stderr, "fuse: failed to clone device fd: %s\n",
  245             strerror(errno));
  246         close(clonefd);
  247         return NULL;
  248     }
  249     newch = fuse_chan_new(clonefd);
  250     if (newch == NULL)
  251         close(clonefd);
  252 
  253     return newch;
  254 }
  255 
  256 static int fuse_loop_start_thread(struct fuse_mt *mt)
  257 {
  258     int res;
  259 
  260     struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
  261     if (!w) {
  262         fprintf(stderr, "fuse: failed to allocate worker structure\n");
  263         return -1;
  264     }
  265     memset(w, 0, sizeof(struct fuse_worker));
  266     w->fbuf.mem = NULL;
  267     w->mt = mt;
  268 
  269     w->ch = NULL;
  270     if (mt->clone_fd) {
  271         w->ch = fuse_clone_chan(mt);
  272         if(!w->ch) {
  273             /* Don't attempt this again */
  274             fprintf(stderr, "fuse: trying to continue "
  275                 "without -o clone_fd.\n");
  276             mt->clone_fd = 0;
  277         }
  278     }
  279 
  280     res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
  281     if (res == -1) {
  282         fuse_chan_put(w->ch);
  283         free(w);
  284         return -1;
  285     }
  286     list_add_worker(w, &mt->main);
  287     mt->numavail ++;
  288     mt->numworker ++;
  289 
  290     return 0;
  291 }
  292 
  293 static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
  294 {
  295     pthread_join(w->thread_id, NULL);
  296     pthread_mutex_lock(&mt->lock);
  297     list_del_worker(w);
  298     pthread_mutex_unlock(&mt->lock);
  299     free(w->fbuf.mem);
  300     fuse_chan_put(w->ch);
  301     free(w);
  302 }
  303 
  304 FUSE_SYMVER(".symver fuse_session_loop_mt_32,fuse_session_loop_mt@@FUSE_3.2");
  305 int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config *config)
  306 {
  307     int err;
  308     struct fuse_mt mt;
  309     struct fuse_worker *w;
  310 
  311     memset(&mt, 0, sizeof(struct fuse_mt));
  312     mt.se = se;
  313     mt.clone_fd = config->clone_fd;
  314     mt.error = 0;
  315     mt.numworker = 0;
  316     mt.numavail = 0;
  317     mt.max_idle = config->max_idle_threads;
  318     mt.main.thread_id = pthread_self();
  319     mt.main.prev = mt.main.next = &mt.main;
  320     sem_init(&mt.finish, 0, 0);
  321     fuse_mutex_init(&mt.lock);
  322 
  323     pthread_mutex_lock(&mt.lock);
  324     err = fuse_loop_start_thread(&mt);
  325     pthread_mutex_unlock(&mt.lock);
  326     if (!err) {
  327         /* sem_wait() is interruptible */
  328         while (!fuse_session_exited(se))
  329             sem_wait(&mt.finish);
  330 
  331         pthread_mutex_lock(&mt.lock);
  332         for (w = mt.main.next; w != &mt.main; w = w->next)
  333             pthread_cancel(w->thread_id);
  334         mt.exit = 1;
  335         pthread_mutex_unlock(&mt.lock);
  336 
  337         while (mt.main.next != &mt.main)
  338             fuse_join_worker(&mt, mt.main.next);
  339 
  340         err = mt.error;
  341     }
  342 
  343     pthread_mutex_destroy(&mt.lock);
  344     sem_destroy(&mt.finish);
  345     if(se->error != 0)
  346         err = se->error;
  347     fuse_session_reset(se);
  348     return err;
  349 }
  350 
  351 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd);
  352 FUSE_SYMVER(".symver fuse_session_loop_mt_31,fuse_session_loop_mt@FUSE_3.0");
  353 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd)
  354 {
  355     struct fuse_loop_config config;
  356     config.clone_fd = clone_fd;
  357     config.max_idle_threads = 10;
  358     return fuse_session_loop_mt_32(se, &config);
  359 }