"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.9/thread.c" (21 Nov 2020, 28897 Bytes) of package /linux/www/memcached-1.6.9.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "thread.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.6.8_vs_1.6.9.

    1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    2 /*
    3  * Thread management for memcached.
    4  */
    5 #include "memcached.h"
    6 #ifdef EXTSTORE
    7 #include "storage.h"
    8 #endif
    9 #include <assert.h>
   10 #include <stdio.h>
   11 #include <errno.h>
   12 #include <stdlib.h>
   13 #include <string.h>
   14 #include <pthread.h>
   15 
   16 #ifdef __sun
   17 #include <atomic.h>
   18 #endif
   19 
   20 #ifdef TLS
   21 #include <openssl/ssl.h>
   22 #endif
   23 
   24 #define ITEMS_PER_ALLOC 64
   25 
   26 /* An item in the connection queue. */
   27 enum conn_queue_item_modes {
   28     queue_new_conn,   /* brand new connection. */
   29 };
   30 typedef struct conn_queue_item CQ_ITEM;
   31 struct conn_queue_item {
   32     int               sfd;
   33     enum conn_states  init_state;
   34     int               event_flags;
   35     int               read_buffer_size;
   36     enum network_transport     transport;
   37     enum conn_queue_item_modes mode;
   38     conn *c;
   39     void    *ssl;
   40     CQ_ITEM          *next;
   41 };
   42 
   43 /* A connection queue. */
   44 typedef struct conn_queue CQ;
   45 struct conn_queue {
   46     CQ_ITEM *head;
   47     CQ_ITEM *tail;
   48     pthread_mutex_t lock;
   49 };
   50 
   51 /* Locks for cache LRU operations */
   52 pthread_mutex_t lru_locks[POWER_LARGEST];
   53 
   54 /* Connection lock around accepting new connections */
   55 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
   56 
   57 #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
   58 pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
   59 #endif
   60 
   61 /* Lock for global stats */
   62 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
   63 
   64 /* Lock to cause worker threads to hang up after being woken */
   65 static pthread_mutex_t worker_hang_lock;
   66 
   67 /* Free list of CQ_ITEM structs */
   68 static CQ_ITEM *cqi_freelist;
   69 static pthread_mutex_t cqi_freelist_lock;
   70 
   71 static pthread_mutex_t *item_locks;
   72 /* size of the item lock hash table */
   73 static uint32_t item_lock_count;
   74 unsigned int item_lock_hashpower;
   75 #define hashsize(n) ((unsigned long int)1<<(n))
   76 #define hashmask(n) (hashsize(n)-1)
   77 
   78 /*
   79  * Each libevent instance has a wakeup pipe, which other threads
   80  * can use to signal that they've put a new connection on its queue.
   81  */
   82 static LIBEVENT_THREAD *threads;
   83 
   84 /*
   85  * Number of worker threads that have finished setting themselves up.
   86  */
   87 static int init_count = 0;
   88 static pthread_mutex_t init_lock;
   89 static pthread_cond_t init_cond;
   90 
   91 
   92 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg);
   93 
   94 /* item_lock() must be held for an item before any modifications to either its
   95  * associated hash bucket, or the structure itself.
   96  * LRU modifications must hold the item lock, and the LRU lock.
   97  * LRU's accessing items must item_trylock() before modifying an item.
   98  * Items accessible from an LRU must not be freed or modified
   99  * without first locking and removing from the LRU.
  100  */
  101 
  102 void item_lock(uint32_t hv) {
  103     mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
  104 }
  105 
  106 void *item_trylock(uint32_t hv) {
  107     pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
  108     if (pthread_mutex_trylock(lock) == 0) {
  109         return lock;
  110     }
  111     return NULL;
  112 }
  113 
  114 void item_trylock_unlock(void *lock) {
  115     mutex_unlock((pthread_mutex_t *) lock);
  116 }
  117 
  118 void item_unlock(uint32_t hv) {
  119     mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
  120 }
  121 
  122 static void wait_for_thread_registration(int nthreads) {
  123     while (init_count < nthreads) {
  124         pthread_cond_wait(&init_cond, &init_lock);
  125     }
  126 }
  127 
  128 static void register_thread_initialized(void) {
  129     pthread_mutex_lock(&init_lock);
  130     init_count++;
  131     pthread_cond_signal(&init_cond);
  132     pthread_mutex_unlock(&init_lock);
  133     /* Force worker threads to pile up if someone wants us to */
  134     pthread_mutex_lock(&worker_hang_lock);
  135     pthread_mutex_unlock(&worker_hang_lock);
  136 }
  137 
  138 /* Must not be called with any deeper locks held */
  139 void pause_threads(enum pause_thread_types type) {
  140     char buf[1];
  141     int i;
  142 
  143     buf[0] = 0;
  144     switch (type) {
  145         case PAUSE_ALL_THREADS:
  146             slabs_rebalancer_pause();
  147             lru_maintainer_pause();
  148             lru_crawler_pause();
  149 #ifdef EXTSTORE
  150             storage_compact_pause();
  151             storage_write_pause();
  152 #endif
  153         case PAUSE_WORKER_THREADS:
  154             buf[0] = 'p';
  155             pthread_mutex_lock(&worker_hang_lock);
  156             break;
  157         case RESUME_ALL_THREADS:
  158             slabs_rebalancer_resume();
  159             lru_maintainer_resume();
  160             lru_crawler_resume();
  161 #ifdef EXTSTORE
  162             storage_compact_resume();
  163             storage_write_resume();
  164 #endif
  165         case RESUME_WORKER_THREADS:
  166             pthread_mutex_unlock(&worker_hang_lock);
  167             break;
  168         default:
  169             fprintf(stderr, "Unknown lock type: %d\n", type);
  170             assert(1 == 0);
  171             break;
  172     }
  173 
  174     /* Only send a message if we have one. */
  175     if (buf[0] == 0) {
  176         return;
  177     }
  178 
  179     pthread_mutex_lock(&init_lock);
  180     init_count = 0;
  181     for (i = 0; i < settings.num_threads; i++) {
  182         if (write(threads[i].notify_send_fd, buf, 1) != 1) {
  183             perror("Failed writing to notify pipe");
  184             /* TODO: This is a fatal problem. Can it ever happen temporarily? */
  185         }
  186     }
  187     wait_for_thread_registration(settings.num_threads);
  188     pthread_mutex_unlock(&init_lock);
  189 }
  190 
  191 // MUST not be called with any deeper locks held
  192 // MUST be called only by parent thread
  193 // Note: listener thread is the "main" event base, which has exited its
  194 // loop in order to call this function.
  195 void stop_threads(void) {
  196     char buf[1];
  197     int i;
  198 
  199     // assoc can call pause_threads(), so we have to stop it first.
  200     stop_assoc_maintenance_thread();
  201     if (settings.verbose > 0)
  202         fprintf(stderr, "stopped assoc\n");
  203 
  204     if (settings.verbose > 0)
  205         fprintf(stderr, "asking workers to stop\n");
  206     buf[0] = 's';
  207     pthread_mutex_lock(&worker_hang_lock);
  208     pthread_mutex_lock(&init_lock);
  209     init_count = 0;
  210     for (i = 0; i < settings.num_threads; i++) {
  211         if (write(threads[i].notify_send_fd, buf, 1) != 1) {
  212             perror("Failed writing to notify pipe");
  213             /* TODO: This is a fatal problem. Can it ever happen temporarily? */
  214         }
  215     }
  216     wait_for_thread_registration(settings.num_threads);
  217     pthread_mutex_unlock(&init_lock);
  218 
  219     // All of the workers are hung but haven't done cleanup yet.
  220 
  221     if (settings.verbose > 0)
  222         fprintf(stderr, "asking background threads to stop\n");
  223 
  224     // stop each side thread.
  225     // TODO: Verify these all work if the threads are already stopped
  226     stop_item_crawler_thread(CRAWLER_WAIT);
  227     if (settings.verbose > 0)
  228         fprintf(stderr, "stopped lru crawler\n");
  229     if (settings.lru_maintainer_thread) {
  230         stop_lru_maintainer_thread();
  231         if (settings.verbose > 0)
  232             fprintf(stderr, "stopped maintainer\n");
  233     }
  234     if (settings.slab_reassign) {
  235         stop_slab_maintenance_thread();
  236         if (settings.verbose > 0)
  237             fprintf(stderr, "stopped slab mover\n");
  238     }
  239     logger_stop();
  240     if (settings.verbose > 0)
  241         fprintf(stderr, "stopped logger thread\n");
  242     stop_conn_timeout_thread();
  243     if (settings.verbose > 0)
  244         fprintf(stderr, "stopped idle timeout thread\n");
  245 
  246     // Close all connections then let the workers finally exit.
  247     if (settings.verbose > 0)
  248         fprintf(stderr, "closing connections\n");
  249     conn_close_all();
  250     pthread_mutex_unlock(&worker_hang_lock);
  251     if (settings.verbose > 0)
  252         fprintf(stderr, "reaping worker threads\n");
  253     for (i = 0; i < settings.num_threads; i++) {
  254         pthread_join(threads[i].thread_id, NULL);
  255     }
  256 
  257     if (settings.verbose > 0)
  258         fprintf(stderr, "all background threads stopped\n");
  259 
  260     // At this point, every background thread must be stopped.
  261 }
  262 
  263 /*
  264  * Initializes a connection queue.
  265  */
  266 static void cq_init(CQ *cq) {
  267     pthread_mutex_init(&cq->lock, NULL);
  268     cq->head = NULL;
  269     cq->tail = NULL;
  270 }
  271 
  272 /*
  273  * Looks for an item on a connection queue, but doesn't block if there isn't
  274  * one.
  275  * Returns the item, or NULL if no item is available
  276  */
  277 static CQ_ITEM *cq_pop(CQ *cq) {
  278     CQ_ITEM *item;
  279 
  280     pthread_mutex_lock(&cq->lock);
  281     item = cq->head;
  282     if (NULL != item) {
  283         cq->head = item->next;
  284         if (NULL == cq->head)
  285             cq->tail = NULL;
  286     }
  287     pthread_mutex_unlock(&cq->lock);
  288 
  289     return item;
  290 }
  291 
  292 /*
  293  * Adds an item to a connection queue.
  294  */
  295 static void cq_push(CQ *cq, CQ_ITEM *item) {
  296     item->next = NULL;
  297 
  298     pthread_mutex_lock(&cq->lock);
  299     if (NULL == cq->tail)
  300         cq->head = item;
  301     else
  302         cq->tail->next = item;
  303     cq->tail = item;
  304     pthread_mutex_unlock(&cq->lock);
  305 }
  306 
  307 /*
  308  * Returns a fresh connection queue item.
  309  */
  310 static CQ_ITEM *cqi_new(void) {
  311     CQ_ITEM *item = NULL;
  312     pthread_mutex_lock(&cqi_freelist_lock);
  313     if (cqi_freelist) {
  314         item = cqi_freelist;
  315         cqi_freelist = item->next;
  316     }
  317     pthread_mutex_unlock(&cqi_freelist_lock);
  318 
  319     if (NULL == item) {
  320         int i;
  321 
  322         /* Allocate a bunch of items at once to reduce fragmentation */
  323         item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
  324         if (NULL == item) {
  325             STATS_LOCK();
  326             stats.malloc_fails++;
  327             STATS_UNLOCK();
  328             return NULL;
  329         }
  330 
  331         /*
  332          * Link together all the new items except the first one
  333          * (which we'll return to the caller) for placement on
  334          * the freelist.
  335          */
  336         for (i = 2; i < ITEMS_PER_ALLOC; i++)
  337             item[i - 1].next = &item[i];
  338 
  339         pthread_mutex_lock(&cqi_freelist_lock);
  340         item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
  341         cqi_freelist = &item[1];
  342         pthread_mutex_unlock(&cqi_freelist_lock);
  343     }
  344 
  345     return item;
  346 }
  347 
  348 
  349 /*
  350  * Frees a connection queue item (adds it to the freelist.)
  351  */
  352 static void cqi_free(CQ_ITEM *item) {
  353     pthread_mutex_lock(&cqi_freelist_lock);
  354     item->next = cqi_freelist;
  355     cqi_freelist = item;
  356     pthread_mutex_unlock(&cqi_freelist_lock);
  357 }
  358 
  359 
  360 /*
  361  * Creates a worker thread.
  362  */
  363 static void create_worker(void *(*func)(void *), void *arg) {
  364     pthread_attr_t  attr;
  365     int             ret;
  366 
  367     pthread_attr_init(&attr);
  368 
  369     if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
  370         fprintf(stderr, "Can't create thread: %s\n",
  371                 strerror(ret));
  372         exit(1);
  373     }
  374 }
  375 
  376 /*
  377  * Sets whether or not we accept new connections.
  378  */
  379 void accept_new_conns(const bool do_accept) {
  380     pthread_mutex_lock(&conn_lock);
  381     do_accept_new_conns(do_accept);
  382     pthread_mutex_unlock(&conn_lock);
  383 }
  384 /****************************** LIBEVENT THREADS *****************************/
  385 
  386 /*
  387  * Set up a thread's information.
  388  */
  389 static void setup_thread(LIBEVENT_THREAD *me) {
  390 #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
  391     struct event_config *ev_config;
  392     ev_config = event_config_new();
  393     event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
  394     me->base = event_base_new_with_config(ev_config);
  395     event_config_free(ev_config);
  396 #else
  397     me->base = event_init();
  398 #endif
  399 
  400     if (! me->base) {
  401         fprintf(stderr, "Can't allocate event base\n");
  402         exit(1);
  403     }
  404 
  405     /* Listen for notifications from other threads */
  406     event_set(&me->notify_event, me->notify_receive_fd,
  407               EV_READ | EV_PERSIST, thread_libevent_process, me);
  408     event_base_set(me->base, &me->notify_event);
  409 
  410     if (event_add(&me->notify_event, 0) == -1) {
  411         fprintf(stderr, "Can't monitor libevent notify pipe\n");
  412         exit(1);
  413     }
  414 
  415     me->new_conn_queue = malloc(sizeof(struct conn_queue));
  416     if (me->new_conn_queue == NULL) {
  417         perror("Failed to allocate memory for connection queue");
  418         exit(EXIT_FAILURE);
  419     }
  420     cq_init(me->new_conn_queue);
  421 
  422     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
  423         perror("Failed to initialize mutex");
  424         exit(EXIT_FAILURE);
  425     }
  426 
  427     me->rbuf_cache = cache_create("rbuf", READ_BUFFER_SIZE, sizeof(char *), NULL, NULL);
  428     if (me->rbuf_cache == NULL) {
  429         fprintf(stderr, "Failed to create read buffer cache\n");
  430         exit(EXIT_FAILURE);
  431     }
  432     // Note: we were cleanly passing in num_threads before, but this now
  433     // relies on settings globals too much.
  434     if (settings.read_buf_mem_limit) {
  435         int limit = settings.read_buf_mem_limit / settings.num_threads;
  436         if (limit < READ_BUFFER_SIZE) {
  437             limit = 1;
  438         } else {
  439             limit = limit / READ_BUFFER_SIZE;
  440         }
  441         cache_set_limit(me->rbuf_cache, limit);
  442     }
  443 
  444     me->io_cache = cache_create("io", sizeof(io_pending_t), sizeof(char*), NULL, NULL);
  445     if (me->io_cache == NULL) {
  446         fprintf(stderr, "Failed to create IO object cache\n");
  447         exit(EXIT_FAILURE);
  448     }
  449 #ifdef TLS
  450     if (settings.ssl_enabled) {
  451         me->ssl_wbuf = (char *)malloc((size_t)settings.ssl_wbuf_size);
  452         if (me->ssl_wbuf == NULL) {
  453             fprintf(stderr, "Failed to allocate the SSL write buffer\n");
  454             exit(EXIT_FAILURE);
  455         }
  456     }
  457 #endif
  458 }
  459 
  460 /*
  461  * Worker thread: main event loop
  462  */
  463 static void *worker_libevent(void *arg) {
  464     LIBEVENT_THREAD *me = arg;
  465 
  466     /* Any per-thread setup can happen here; memcached_thread_init() will block until
  467      * all threads have finished initializing.
  468      */
  469     me->l = logger_create();
  470     me->lru_bump_buf = item_lru_bump_buf_create();
  471     if (me->l == NULL || me->lru_bump_buf == NULL) {
  472         abort();
  473     }
  474 
  475     if (settings.drop_privileges) {
  476         drop_worker_privileges();
  477     }
  478 
  479     register_thread_initialized();
  480 
  481     event_base_loop(me->base, 0);
  482 
  483     // same mechanism used to watch for all threads exiting.
  484     register_thread_initialized();
  485 
  486     event_base_free(me->base);
  487     return NULL;
  488 }
  489 
  490 
  491 /*
  492  * Processes an incoming "handle a new connection" item. This is called when
  493  * input arrives on the libevent wakeup pipe.
  494  */
  495 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
  496     LIBEVENT_THREAD *me = arg;
  497     CQ_ITEM *item;
  498     char buf[1];
  499     conn *c;
  500     unsigned int fd_from_pipe;
  501 
  502     if (read(fd, buf, 1) != 1) {
  503         if (settings.verbose > 0)
  504             fprintf(stderr, "Can't read from libevent pipe\n");
  505         return;
  506     }
  507 
  508     switch (buf[0]) {
  509     case 'c':
  510         item = cq_pop(me->new_conn_queue);
  511 
  512         if (NULL == item) {
  513             break;
  514         }
  515         switch (item->mode) {
  516             case queue_new_conn:
  517                 c = conn_new(item->sfd, item->init_state, item->event_flags,
  518                                    item->read_buffer_size, item->transport,
  519                                    me->base, item->ssl);
  520                 if (c == NULL) {
  521                     if (IS_UDP(item->transport)) {
  522                         fprintf(stderr, "Can't listen for events on UDP socket\n");
  523                         exit(1);
  524                     } else {
  525                         if (settings.verbose > 0) {
  526                             fprintf(stderr, "Can't listen for events on fd %d\n",
  527                                 item->sfd);
  528                         }
  529 #ifdef TLS
  530                         if (item->ssl) {
  531                             SSL_shutdown(item->ssl);
  532                             SSL_free(item->ssl);
  533                         }
  534 #endif
  535                         close(item->sfd);
  536                     }
  537                 } else {
  538                     c->thread = me;
  539 #ifdef EXTSTORE
  540                     if (c->thread->storage) {
  541                         conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->storage,
  542                             storage_submit_cb, storage_complete_cb, storage_finalize_cb);
  543                     }
  544 #endif
  545                     conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
  546 
  547 #ifdef TLS
  548                     if (settings.ssl_enabled && c->ssl != NULL) {
  549                         assert(c->thread && c->thread->ssl_wbuf);
  550                         c->ssl_wbuf = c->thread->ssl_wbuf;
  551                     }
  552 #endif
  553                 }
  554                 break;
  555         }
  556         cqi_free(item);
  557         break;
  558     /* we were told to pause and report in */
  559     case 'p':
  560         register_thread_initialized();
  561         break;
  562     /* a client socket timed out */
  563     case 't':
  564         if (read(fd, &fd_from_pipe, sizeof(fd_from_pipe)) != sizeof(fd_from_pipe)) {
  565             if (settings.verbose > 0)
  566                 fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
  567             return;
  568         }
  569         conn_close_idle(conns[fd_from_pipe]);
  570         break;
  571     /* a side thread redispatched a client connection */
  572     case 'r':
  573         if (read(fd, &fd_from_pipe, sizeof(fd_from_pipe)) != sizeof(fd_from_pipe)) {
  574             if (settings.verbose > 0)
  575                 fprintf(stderr, "Can't read redispatch fd from libevent pipe\n");
  576             return;
  577         }
  578         conn_worker_readd(conns[fd_from_pipe]);
  579         break;
  580     /* asked to stop */
  581     case 's':
  582         event_base_loopexit(me->base, NULL);
  583         break;
  584     }
  585 }
  586 
  587 /* Which thread we assigned a connection to most recently. */
  588 static int last_thread = -1;
  589 
  590 /* Last thread we assigned to a connection based on napi_id */
  591 static int last_thread_by_napi_id = -1;
  592 
  593 static LIBEVENT_THREAD *select_thread_round_robin(void)
  594 {
  595     int tid = (last_thread + 1) % settings.num_threads;
  596 
  597     last_thread = tid;
  598 
  599     return threads + tid;
  600 }
  601 
  602 static void reset_threads_napi_id(void)
  603 {
  604     LIBEVENT_THREAD *thread;
  605     int i;
  606 
  607     for (i = 0; i < settings.num_threads; i++) {
  608          thread = threads + i;
  609          thread->napi_id = 0;
  610     }
  611 
  612     last_thread_by_napi_id = -1;
  613 }
  614 
  615 /* Select a worker thread based on the NAPI ID of an incoming connection
  616  * request. NAPI ID is a globally unique ID that identifies a NIC RX queue
  617  * on which a flow is received.
  618  */
  619 static LIBEVENT_THREAD *select_thread_by_napi_id(int sfd)
  620 {
  621     LIBEVENT_THREAD *thread;
  622     int napi_id, err, i;
  623     socklen_t len;
  624     int tid = -1;
  625 
  626     len = sizeof(socklen_t);
  627     err = getsockopt(sfd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len);
  628     if ((err == -1) || (napi_id == 0)) {
  629         STATS_LOCK();
  630         stats.round_robin_fallback++;
  631         STATS_UNLOCK();
  632         return select_thread_round_robin();
  633     }
  634 
  635 select:
  636     for (i = 0; i < settings.num_threads; i++) {
  637          thread = threads + i;
  638          if (last_thread_by_napi_id < i) {
  639              thread->napi_id = napi_id;
  640              last_thread_by_napi_id = i;
  641              tid = i;
  642              break;
  643          }
  644          if (thread->napi_id == napi_id) {
  645              tid = i;
  646              break;
  647          }
  648     }
  649 
  650     if (tid == -1) {
  651         STATS_LOCK();
  652         stats.unexpected_napi_ids++;
  653         STATS_UNLOCK();
  654         reset_threads_napi_id();
  655         goto select;
  656     }
  657 
  658     return threads + tid;
  659 }
  660 
  661 /*
  662  * Dispatches a new connection to another thread. This is only ever called
  663  * from the main thread, either during initialization (for UDP) or because
  664  * of an incoming connection.
  665  */
  666 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
  667                        int read_buffer_size, enum network_transport transport, void *ssl) {
  668     CQ_ITEM *item = cqi_new();
  669     char buf[1];
  670     if (item == NULL) {
  671         close(sfd);
  672         /* given that malloc failed this may also fail, but let's try */
  673         fprintf(stderr, "Failed to allocate memory for connection object\n");
  674         return;
  675     }
  676 
  677     LIBEVENT_THREAD *thread;
  678 
  679     if (!settings.num_napi_ids)
  680         thread = select_thread_round_robin();
  681     else
  682         thread = select_thread_by_napi_id(sfd);
  683 
  684     item->sfd = sfd;
  685     item->init_state = init_state;
  686     item->event_flags = event_flags;
  687     item->read_buffer_size = read_buffer_size;
  688     item->transport = transport;
  689     item->mode = queue_new_conn;
  690     item->ssl = ssl;
  691 
  692     cq_push(thread->new_conn_queue, item);
  693 
  694     MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
  695     buf[0] = 'c';
  696     if (write(thread->notify_send_fd, buf, 1) != 1) {
  697         perror("Writing to thread notify pipe");
  698     }
  699 }
  700 
  701 /*
  702  * Re-dispatches a connection back to the original thread. Can be called from
  703  * any side thread borrowing a connection.
  704  */
  705 #define REDISPATCH_MSG_SIZE (1 + sizeof(int))
  706 void redispatch_conn(conn *c) {
  707     char buf[REDISPATCH_MSG_SIZE];
  708     LIBEVENT_THREAD *thread = c->thread;
  709 
  710     buf[0] = 'r';
  711     memcpy(&buf[1], &c->sfd, sizeof(int));
  712     if (write(thread->notify_send_fd, buf, REDISPATCH_MSG_SIZE) != REDISPATCH_MSG_SIZE) {
  713         perror("Writing redispatch to thread notify pipe");
  714     }
  715 }
  716 
  717 /* This misses the allow_new_conns flag :( */
  718 void sidethread_conn_close(conn *c) {
  719     if (settings.verbose > 1)
  720         fprintf(stderr, "<%d connection closing from side thread.\n", c->sfd);
  721 
  722     c->state = conn_closing;
  723     // redispatch will see closing flag and properly close connection.
  724     redispatch_conn(c);
  725     return;
  726 }
  727 
  728 /********************************* ITEM ACCESS *******************************/
  729 
  730 /*
  731  * Allocates a new item.
  732  */
  733 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
  734     item *it;
  735     /* do_item_alloc handles its own locks */
  736     it = do_item_alloc(key, nkey, flags, exptime, nbytes);
  737     return it;
  738 }
  739 
  740 /*
  741  * Returns an item if it hasn't been marked as expired,
  742  * lazy-expiring as needed.
  743  */
  744 item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
  745     item *it;
  746     uint32_t hv;
  747     hv = hash(key, nkey);
  748     item_lock(hv);
  749     it = do_item_get(key, nkey, hv, c, do_update);
  750     item_unlock(hv);
  751     return it;
  752 }
  753 
  754 // returns an item with the item lock held.
  755 // lock will still be held even if return is NULL, allowing caller to replace
  756 // an item atomically if desired.
  757 item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv) {
  758     item *it;
  759     *hv = hash(key, nkey);
  760     item_lock(*hv);
  761     it = do_item_get(key, nkey, *hv, c, do_update);
  762     return it;
  763 }
  764 
  765 item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
  766     item *it;
  767     uint32_t hv;
  768     hv = hash(key, nkey);
  769     item_lock(hv);
  770     it = do_item_touch(key, nkey, exptime, hv, c);
  771     item_unlock(hv);
  772     return it;
  773 }
  774 
  775 /*
  776  * Links an item into the LRU and hashtable.
  777  */
  778 int item_link(item *item) {
  779     int ret;
  780     uint32_t hv;
  781 
  782     hv = hash(ITEM_key(item), item->nkey);
  783     item_lock(hv);
  784     ret = do_item_link(item, hv);
  785     item_unlock(hv);
  786     return ret;
  787 }
  788 
  789 /*
  790  * Decrements the reference count on an item and adds it to the freelist if
  791  * needed.
  792  */
  793 void item_remove(item *item) {
  794     uint32_t hv;
  795     hv = hash(ITEM_key(item), item->nkey);
  796 
  797     item_lock(hv);
  798     do_item_remove(item);
  799     item_unlock(hv);
  800 }
  801 
  802 /*
  803  * Replaces one item with another in the hashtable.
  804  * Unprotected by a mutex lock since the core server does not require
  805  * it to be thread-safe.
  806  */
  807 int item_replace(item *old_it, item *new_it, const uint32_t hv) {
  808     return do_item_replace(old_it, new_it, hv);
  809 }
  810 
  811 /*
  812  * Unlinks an item from the LRU and hashtable.
  813  */
  814 void item_unlink(item *item) {
  815     uint32_t hv;
  816     hv = hash(ITEM_key(item), item->nkey);
  817     item_lock(hv);
  818     do_item_unlink(item, hv);
  819     item_unlock(hv);
  820 }
  821 
  822 /*
  823  * Does arithmetic on a numeric item value.
  824  */
  825 enum delta_result_type add_delta(conn *c, const char *key,
  826                                  const size_t nkey, bool incr,
  827                                  const int64_t delta, char *buf,
  828                                  uint64_t *cas) {
  829     enum delta_result_type ret;
  830     uint32_t hv;
  831 
  832     hv = hash(key, nkey);
  833     item_lock(hv);
  834     ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv, NULL);
  835     item_unlock(hv);
  836     return ret;
  837 }
  838 
  839 /*
  840  * Stores an item in the cache (high level, obeys set/add/replace semantics)
  841  */
  842 enum store_item_type store_item(item *item, int comm, conn* c) {
  843     enum store_item_type ret;
  844     uint32_t hv;
  845 
  846     hv = hash(ITEM_key(item), item->nkey);
  847     item_lock(hv);
  848     ret = do_store_item(item, comm, c, hv);
  849     item_unlock(hv);
  850     return ret;
  851 }
  852 
  853 /******************************* GLOBAL STATS ******************************/
  854 
  855 void STATS_LOCK() {
  856     pthread_mutex_lock(&stats_lock);
  857 }
  858 
  859 void STATS_UNLOCK() {
  860     pthread_mutex_unlock(&stats_lock);
  861 }
  862 
  863 void threadlocal_stats_reset(void) {
  864     int ii;
  865     for (ii = 0; ii < settings.num_threads; ++ii) {
  866         pthread_mutex_lock(&threads[ii].stats.mutex);
  867 #define X(name) threads[ii].stats.name = 0;
  868         THREAD_STATS_FIELDS
  869 #ifdef EXTSTORE
  870         EXTSTORE_THREAD_STATS_FIELDS
  871 #endif
  872 #undef X
  873 
  874         memset(&threads[ii].stats.slab_stats, 0,
  875                 sizeof(threads[ii].stats.slab_stats));
  876         memset(&threads[ii].stats.lru_hits, 0,
  877                 sizeof(uint64_t) * POWER_LARGEST);
  878 
  879         pthread_mutex_unlock(&threads[ii].stats.mutex);
  880     }
  881 }
  882 
  883 void threadlocal_stats_aggregate(struct thread_stats *stats) {
  884     int ii, sid;
  885 
  886     /* The struct has a mutex, but we can safely set the whole thing
  887      * to zero since it is unused when aggregating. */
  888     memset(stats, 0, sizeof(*stats));
  889 
  890     for (ii = 0; ii < settings.num_threads; ++ii) {
  891         pthread_mutex_lock(&threads[ii].stats.mutex);
  892 #define X(name) stats->name += threads[ii].stats.name;
  893         THREAD_STATS_FIELDS
  894 #ifdef EXTSTORE
  895         EXTSTORE_THREAD_STATS_FIELDS
  896 #endif
  897 #undef X
  898 
  899         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
  900 #define X(name) stats->slab_stats[sid].name += \
  901             threads[ii].stats.slab_stats[sid].name;
  902             SLAB_STATS_FIELDS
  903 #undef X
  904         }
  905 
  906         for (sid = 0; sid < POWER_LARGEST; sid++) {
  907             stats->lru_hits[sid] +=
  908                 threads[ii].stats.lru_hits[sid];
  909             stats->slab_stats[CLEAR_LRU(sid)].get_hits +=
  910                 threads[ii].stats.lru_hits[sid];
  911         }
  912 
  913         stats->read_buf_count += threads[ii].rbuf_cache->total;
  914         stats->read_buf_bytes += threads[ii].rbuf_cache->total * READ_BUFFER_SIZE;
  915         stats->read_buf_bytes_free += threads[ii].rbuf_cache->freecurr * READ_BUFFER_SIZE;
  916         pthread_mutex_unlock(&threads[ii].stats.mutex);
  917     }
  918 }
  919 
  920 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
  921     int sid;
  922 
  923     memset(out, 0, sizeof(*out));
  924 
  925     for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
  926 #define X(name) out->name += stats->slab_stats[sid].name;
  927         SLAB_STATS_FIELDS
  928 #undef X
  929     }
  930 }
  931 
  932 /*
  933  * Initializes the thread subsystem, creating various worker threads.
  934  *
  935  * nthreads  Number of worker event handler threads to spawn
  936  */
  937 void memcached_thread_init(int nthreads, void *arg) {
  938     int         i;
  939     int         power;
  940 
  941     for (i = 0; i < POWER_LARGEST; i++) {
  942         pthread_mutex_init(&lru_locks[i], NULL);
  943     }
  944     pthread_mutex_init(&worker_hang_lock, NULL);
  945 
  946     pthread_mutex_init(&init_lock, NULL);
  947     pthread_cond_init(&init_cond, NULL);
  948 
  949     pthread_mutex_init(&cqi_freelist_lock, NULL);
  950     cqi_freelist = NULL;
  951 
  952     /* Want a wide lock table, but don't waste memory */
  953     if (nthreads < 3) {
  954         power = 10;
  955     } else if (nthreads < 4) {
  956         power = 11;
  957     } else if (nthreads < 5) {
  958         power = 12;
  959     } else if (nthreads <= 10) {
  960         power = 13;
  961     } else if (nthreads <= 20) {
  962         power = 14;
  963     } else {
  964         /* 32k buckets. just under the hashpower default. */
  965         power = 15;
  966     }
  967 
  968     if (power >= hashpower) {
  969         fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
  970         fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
  971         fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
  972         exit(1);
  973     }
  974 
  975     item_lock_count = hashsize(power);
  976     item_lock_hashpower = power;
  977 
  978     item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
  979     if (! item_locks) {
  980         perror("Can't allocate item locks");
  981         exit(1);
  982     }
  983     for (i = 0; i < item_lock_count; i++) {
  984         pthread_mutex_init(&item_locks[i], NULL);
  985     }
  986 
  987     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
  988     if (! threads) {
  989         perror("Can't allocate thread descriptors");
  990         exit(1);
  991     }
  992 
  993     for (i = 0; i < nthreads; i++) {
  994         int fds[2];
  995         if (pipe(fds)) {
  996             perror("Can't create notify pipe");
  997             exit(1);
  998         }
  999 
 1000         threads[i].notify_receive_fd = fds[0];
 1001         threads[i].notify_send_fd = fds[1];
 1002 #ifdef EXTSTORE
 1003         threads[i].storage = arg;
 1004 #endif
 1005         setup_thread(&threads[i]);
 1006         /* Reserve three fds for the libevent base, and two for the pipe */
 1007         stats_state.reserved_fds += 5;
 1008     }
 1009 
 1010     /* Create threads after we've done all the libevent setup. */
 1011     for (i = 0; i < nthreads; i++) {
 1012         create_worker(worker_libevent, &threads[i]);
 1013     }
 1014 
 1015     /* Wait for all the threads to set themselves up before returning. */
 1016     pthread_mutex_lock(&init_lock);
 1017     wait_for_thread_registration(nthreads);
 1018     pthread_mutex_unlock(&init_lock);
 1019 }
 1020