"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.15/thread.c" (21 Feb 2022, 32047 Bytes) of package /linux/www/memcached-1.6.15.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "thread.c" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 1.6.12_vs_1.6.13.

    1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    2 /*
    3  * Thread management for memcached.
    4  */
    5 #include "memcached.h"
    6 #ifdef EXTSTORE
    7 #include "storage.h"
    8 #endif
    9 #ifdef HAVE_EVENTFD
   10 #include <sys/eventfd.h>
   11 #endif
   12 #ifdef PROXY
   13 #include "proto_proxy.h"
   14 #endif
   15 #include <assert.h>
   16 #include <stdio.h>
   17 #include <errno.h>
   18 #include <stdlib.h>
   19 #include <string.h>
   20 #include <pthread.h>
   21 
   22 #include "queue.h"
   23 
   24 #ifdef __sun
   25 #include <atomic.h>
   26 #endif
   27 
   28 #ifdef TLS
   29 #include <openssl/ssl.h>
   30 #endif
   31 
   32 #define ITEMS_PER_ALLOC 64
   33 
   34 /* An item in the connection queue. */
   35 enum conn_queue_item_modes {
   36     queue_new_conn,   /* brand new connection. */
   37     queue_pause,      /* pause thread */
   38     queue_timeout,    /* socket sfd timed out */
   39     queue_redispatch, /* return conn from side thread */
   40     queue_stop,       /* exit thread */
   41     queue_return_io,  /* returning a pending IO object immediately */
   42 #ifdef PROXY
   43     queue_proxy_reload, /* signal proxy to reload worker VM */
   44 #endif
   45 };
   46 typedef struct conn_queue_item CQ_ITEM;
   47 struct conn_queue_item {
   48     int               sfd;
   49     enum conn_states  init_state;
   50     int               event_flags;
   51     int               read_buffer_size;
   52     enum network_transport     transport;
   53     enum conn_queue_item_modes mode;
   54     conn *c;
   55     void    *ssl;
   56     io_pending_t *io; // IO when used for deferred IO handling.
   57     STAILQ_ENTRY(conn_queue_item) i_next;
   58 };
   59 
   60 /* A connection queue. */
   61 typedef struct conn_queue CQ;
   62 struct conn_queue {
   63     STAILQ_HEAD(conn_ev_head, conn_queue_item) head;
   64     pthread_mutex_t lock;
   65     cache_t *cache; /* freelisted objects */
   66 };
   67 
   68 /* Locks for cache LRU operations */
   69 pthread_mutex_t lru_locks[POWER_LARGEST];
   70 
   71 /* Connection lock around accepting new connections */
   72 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
   73 
   74 #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
   75 pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
   76 #endif
   77 
   78 /* Lock for global stats */
   79 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
   80 
   81 /* Lock to cause worker threads to hang up after being woken */
   82 static pthread_mutex_t worker_hang_lock;
   83 
   84 static pthread_mutex_t *item_locks;
   85 /* size of the item lock hash table */
   86 static uint32_t item_lock_count;
   87 unsigned int item_lock_hashpower;
   88 #define hashsize(n) ((unsigned long int)1<<(n))
   89 #define hashmask(n) (hashsize(n)-1)
   90 
   91 /*
   92  * Each libevent instance has a wakeup pipe, which other threads
   93  * can use to signal that they've put a new connection on its queue.
   94  */
   95 static LIBEVENT_THREAD *threads;
   96 
   97 /*
   98  * Number of worker threads that have finished setting themselves up.
   99  */
  100 static int init_count = 0;
  101 static pthread_mutex_t init_lock;
  102 static pthread_cond_t init_cond;
  103 
  104 static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item);
  105 static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode);
  106 static CQ_ITEM *cqi_new(CQ *cq);
  107 static void cq_push(CQ *cq, CQ_ITEM *item);
  108 
  109 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg);
  110 
  111 /* item_lock() must be held for an item before any modifications to either its
  112  * associated hash bucket, or the structure itself.
  113  * LRU modifications must hold the item lock, and the LRU lock.
  114  * LRU's accessing items must item_trylock() before modifying an item.
  115  * Items accessible from an LRU must not be freed or modified
  116  * without first locking and removing from the LRU.
  117  */
  118 
  119 void item_lock(uint32_t hv) {
  120     mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
  121 }
  122 
  123 void *item_trylock(uint32_t hv) {
  124     pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
  125     if (pthread_mutex_trylock(lock) == 0) {
  126         return lock;
  127     }
  128     return NULL;
  129 }
  130 
  131 void item_trylock_unlock(void *lock) {
  132     mutex_unlock((pthread_mutex_t *) lock);
  133 }
  134 
  135 void item_unlock(uint32_t hv) {
  136     mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
  137 }
  138 
  139 static void wait_for_thread_registration(int nthreads) {
  140     while (init_count < nthreads) {
  141         pthread_cond_wait(&init_cond, &init_lock);
  142     }
  143 }
  144 
  145 static void register_thread_initialized(void) {
  146     pthread_mutex_lock(&init_lock);
  147     init_count++;
  148     pthread_cond_signal(&init_cond);
  149     pthread_mutex_unlock(&init_lock);
  150     /* Force worker threads to pile up if someone wants us to */
  151     pthread_mutex_lock(&worker_hang_lock);
  152     pthread_mutex_unlock(&worker_hang_lock);
  153 }
  154 
  155 /* Must not be called with any deeper locks held */
  156 void pause_threads(enum pause_thread_types type) {
  157     int i;
  158     bool pause_workers = false;
  159 
  160     switch (type) {
  161         case PAUSE_ALL_THREADS:
  162             slabs_rebalancer_pause();
  163             lru_maintainer_pause();
  164             lru_crawler_pause();
  165 #ifdef EXTSTORE
  166             storage_compact_pause();
  167             storage_write_pause();
  168 #endif
  169         case PAUSE_WORKER_THREADS:
  170             pause_workers = true;
  171             pthread_mutex_lock(&worker_hang_lock);
  172             break;
  173         case RESUME_ALL_THREADS:
  174             slabs_rebalancer_resume();
  175             lru_maintainer_resume();
  176             lru_crawler_resume();
  177 #ifdef EXTSTORE
  178             storage_compact_resume();
  179             storage_write_resume();
  180 #endif
  181         case RESUME_WORKER_THREADS:
  182             pthread_mutex_unlock(&worker_hang_lock);
  183             break;
  184         default:
  185             fprintf(stderr, "Unknown lock type: %d\n", type);
  186             assert(1 == 0);
  187             break;
  188     }
  189 
  190     /* Only send a message if we have one. */
  191     if (!pause_workers) {
  192         return;
  193     }
  194 
  195     pthread_mutex_lock(&init_lock);
  196     init_count = 0;
  197     for (i = 0; i < settings.num_threads; i++) {
  198         notify_worker_fd(&threads[i], 0, queue_pause);
  199     }
  200     wait_for_thread_registration(settings.num_threads);
  201     pthread_mutex_unlock(&init_lock);
  202 }
  203 
  204 // MUST not be called with any deeper locks held
  205 // MUST be called only by parent thread
  206 // Note: listener thread is the "main" event base, which has exited its
  207 // loop in order to call this function.
  208 void stop_threads(void) {
  209     int i;
  210 
  211     // assoc can call pause_threads(), so we have to stop it first.
  212     stop_assoc_maintenance_thread();
  213     if (settings.verbose > 0)
  214         fprintf(stderr, "stopped assoc\n");
  215 
  216     if (settings.verbose > 0)
  217         fprintf(stderr, "asking workers to stop\n");
  218 
  219     pthread_mutex_lock(&worker_hang_lock);
  220     pthread_mutex_lock(&init_lock);
  221     init_count = 0;
  222     for (i = 0; i < settings.num_threads; i++) {
  223         notify_worker_fd(&threads[i], 0, queue_stop);
  224     }
  225     wait_for_thread_registration(settings.num_threads);
  226     pthread_mutex_unlock(&init_lock);
  227 
  228     // All of the workers are hung but haven't done cleanup yet.
  229 
  230     if (settings.verbose > 0)
  231         fprintf(stderr, "asking background threads to stop\n");
  232 
  233     // stop each side thread.
  234     // TODO: Verify these all work if the threads are already stopped
  235     stop_item_crawler_thread(CRAWLER_WAIT);
  236     if (settings.verbose > 0)
  237         fprintf(stderr, "stopped lru crawler\n");
  238     if (settings.lru_maintainer_thread) {
  239         stop_lru_maintainer_thread();
  240         if (settings.verbose > 0)
  241             fprintf(stderr, "stopped maintainer\n");
  242     }
  243     if (settings.slab_reassign) {
  244         stop_slab_maintenance_thread();
  245         if (settings.verbose > 0)
  246             fprintf(stderr, "stopped slab mover\n");
  247     }
  248     logger_stop();
  249     if (settings.verbose > 0)
  250         fprintf(stderr, "stopped logger thread\n");
  251     stop_conn_timeout_thread();
  252     if (settings.verbose > 0)
  253         fprintf(stderr, "stopped idle timeout thread\n");
  254 
  255     // Close all connections then let the workers finally exit.
  256     if (settings.verbose > 0)
  257         fprintf(stderr, "closing connections\n");
  258     conn_close_all();
  259     pthread_mutex_unlock(&worker_hang_lock);
  260     if (settings.verbose > 0)
  261         fprintf(stderr, "reaping worker threads\n");
  262     for (i = 0; i < settings.num_threads; i++) {
  263         pthread_join(threads[i].thread_id, NULL);
  264     }
  265 
  266     if (settings.verbose > 0)
  267         fprintf(stderr, "all background threads stopped\n");
  268 
  269     // At this point, every background thread must be stopped.
  270 }
  271 
  272 /*
  273  * Initializes a connection queue.
  274  */
  275 static void cq_init(CQ *cq) {
  276     pthread_mutex_init(&cq->lock, NULL);
  277     STAILQ_INIT(&cq->head);
  278     cq->cache = cache_create("cq", sizeof(CQ_ITEM), sizeof(char *));
  279     if (cq->cache == NULL) {
  280         fprintf(stderr, "Failed to create connection queue cache\n");
  281         exit(EXIT_FAILURE);
  282     }
  283 }
  284 
  285 /*
  286  * Looks for an item on a connection queue, but doesn't block if there isn't
  287  * one.
  288  * Returns the item, or NULL if no item is available
  289  */
  290 static CQ_ITEM *cq_pop(CQ *cq) {
  291     CQ_ITEM *item;
  292 
  293     pthread_mutex_lock(&cq->lock);
  294     item = STAILQ_FIRST(&cq->head);
  295     if (item != NULL) {
  296         STAILQ_REMOVE_HEAD(&cq->head, i_next);
  297     }
  298     pthread_mutex_unlock(&cq->lock);
  299 
  300     return item;
  301 }
  302 
  303 /*
  304  * Adds an item to a connection queue.
  305  */
  306 static void cq_push(CQ *cq, CQ_ITEM *item) {
  307     pthread_mutex_lock(&cq->lock);
  308     STAILQ_INSERT_TAIL(&cq->head, item, i_next);
  309     pthread_mutex_unlock(&cq->lock);
  310 }
  311 
  312 /*
  313  * Returns a fresh connection queue item.
  314  */
  315 static CQ_ITEM *cqi_new(CQ *cq) {
  316     CQ_ITEM *item = cache_alloc(cq->cache);
  317     if (item == NULL) {
  318         STATS_LOCK();
  319         stats.malloc_fails++;
  320         STATS_UNLOCK();
  321     }
  322     return item;
  323 }
  324 
  325 /*
  326  * Frees a connection queue item (adds it to the freelist.)
  327  */
  328 static void cqi_free(CQ *cq, CQ_ITEM *item) {
  329     cache_free(cq->cache, item);
  330 }
  331 
  332 // TODO: Skip notify if queue wasn't empty?
  333 // - Requires cq_push() returning a "was empty" flag
  334 // - Requires event handling loop to pop the entire queue and work from that
  335 // instead of the ev_count work there now.
  336 // In testing this does result in a large performance uptick, but unclear how
  337 // much that will transfer from a synthetic benchmark.
  338 static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
  339     cq_push(t->ev_queue, item);
  340 #ifdef HAVE_EVENTFD
  341     uint64_t u = 1;
  342     if (write(t->notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
  343         perror("failed writing to worker eventfd");
  344         /* TODO: This is a fatal problem. Can it ever happen temporarily? */
  345     }
  346 #else
  347     char buf[1] = "c";
  348     if (write(t->notify_send_fd, buf, 1) != 1) {
  349         perror("Failed writing to notify pipe");
  350         /* TODO: This is a fatal problem. Can it ever happen temporarily? */
  351     }
  352 #endif
  353 }
  354 
  355 // NOTE: An external func that takes a conn *c might be cleaner overall.
  356 static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode) {
  357     CQ_ITEM *item;
  358     while ( (item = cqi_new(t->ev_queue)) == NULL ) {
  359         // NOTE: most callers of this function cannot fail, but mallocs in
  360         // theory can fail. Small mallocs essentially never do without also
  361         // killing the process. Syscalls can also fail but the original code
  362         // never handled this either.
  363         // As a compromise, I'm leaving this note and this loop: This alloc
  364         // cannot fail, but pre-allocating the data is too much code in an
  365         // area I want to keep more lean. If this CQ business becomes a more
  366         // generic queue I'll reconsider.
  367     }
  368 
  369     item->mode = mode;
  370     item->sfd = sfd;
  371     notify_worker(t, item);
  372 }
  373 
  374 /*
  375  * Creates a worker thread.
  376  */
  377 static void create_worker(void *(*func)(void *), void *arg) {
  378     pthread_attr_t  attr;
  379     int             ret;
  380 
  381     pthread_attr_init(&attr);
  382 
  383     if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
  384         fprintf(stderr, "Can't create thread: %s\n",
  385                 strerror(ret));
  386         exit(1);
  387     }
  388 }
  389 
  390 /*
  391  * Sets whether or not we accept new connections.
  392  */
  393 void accept_new_conns(const bool do_accept) {
  394     pthread_mutex_lock(&conn_lock);
  395     do_accept_new_conns(do_accept);
  396     pthread_mutex_unlock(&conn_lock);
  397 }
  398 /****************************** LIBEVENT THREADS *****************************/
  399 
  400 /*
  401  * Set up a thread's information.
  402  */
  403 static void setup_thread(LIBEVENT_THREAD *me) {
  404 #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
  405     struct event_config *ev_config;
  406     ev_config = event_config_new();
  407     event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
  408     me->base = event_base_new_with_config(ev_config);
  409     event_config_free(ev_config);
  410 #else
  411     me->base = event_init();
  412 #endif
  413 
  414     if (! me->base) {
  415         fprintf(stderr, "Can't allocate event base\n");
  416         exit(1);
  417     }
  418 
  419     /* Listen for notifications from other threads */
  420 #ifdef HAVE_EVENTFD
  421     event_set(&me->notify_event, me->notify_event_fd,
  422               EV_READ | EV_PERSIST, thread_libevent_process, me);
  423 #else
  424     event_set(&me->notify_event, me->notify_receive_fd,
  425               EV_READ | EV_PERSIST, thread_libevent_process, me);
  426 #endif
  427     event_base_set(me->base, &me->notify_event);
  428 
  429     if (event_add(&me->notify_event, 0) == -1) {
  430         fprintf(stderr, "Can't monitor libevent notify pipe\n");
  431         exit(1);
  432     }
  433 
  434     me->ev_queue = malloc(sizeof(struct conn_queue));
  435     if (me->ev_queue == NULL) {
  436         perror("Failed to allocate memory for connection queue");
  437         exit(EXIT_FAILURE);
  438     }
  439     cq_init(me->ev_queue);
  440 
  441     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
  442         perror("Failed to initialize mutex");
  443         exit(EXIT_FAILURE);
  444     }
  445 
  446     me->rbuf_cache = cache_create("rbuf", READ_BUFFER_SIZE, sizeof(char *));
  447     if (me->rbuf_cache == NULL) {
  448         fprintf(stderr, "Failed to create read buffer cache\n");
  449         exit(EXIT_FAILURE);
  450     }
  451     // Note: we were cleanly passing in num_threads before, but this now
  452     // relies on settings globals too much.
  453     if (settings.read_buf_mem_limit) {
  454         int limit = settings.read_buf_mem_limit / settings.num_threads;
  455         if (limit < READ_BUFFER_SIZE) {
  456             limit = 1;
  457         } else {
  458             limit = limit / READ_BUFFER_SIZE;
  459         }
  460         cache_set_limit(me->rbuf_cache, limit);
  461     }
  462 
  463     me->io_cache = cache_create("io", sizeof(io_pending_t), sizeof(char*));
  464     if (me->io_cache == NULL) {
  465         fprintf(stderr, "Failed to create IO object cache\n");
  466         exit(EXIT_FAILURE);
  467     }
  468 #ifdef TLS
  469     if (settings.ssl_enabled) {
  470         me->ssl_wbuf = (char *)malloc((size_t)settings.ssl_wbuf_size);
  471         if (me->ssl_wbuf == NULL) {
  472             fprintf(stderr, "Failed to allocate the SSL write buffer\n");
  473             exit(EXIT_FAILURE);
  474         }
  475     }
  476 #endif
  477 #ifdef EXTSTORE
  478     // me->storage is set just before this function is called.
  479     if (me->storage) {
  480         thread_io_queue_add(me, IO_QUEUE_EXTSTORE, me->storage,
  481             storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb);
  482     }
  483 #endif
  484 #ifdef PROXY
  485     thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb,
  486             proxy_complete_cb, proxy_return_cb, proxy_finalize_cb);
  487 
  488     // TODO: maybe register hooks to be called here from sub-packages? ie;
  489     // extstore, TLS, proxy.
  490     if (settings.proxy_enabled) {
  491         proxy_thread_init(me);
  492     }
  493 #endif
  494     thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL);
  495 }
  496 
  497 /*
  498  * Worker thread: main event loop
  499  */
  500 static void *worker_libevent(void *arg) {
  501     LIBEVENT_THREAD *me = arg;
  502 
  503     /* Any per-thread setup can happen here; memcached_thread_init() will block until
  504      * all threads have finished initializing.
  505      */
  506     me->l = logger_create();
  507     me->lru_bump_buf = item_lru_bump_buf_create();
  508     if (me->l == NULL || me->lru_bump_buf == NULL) {
  509         abort();
  510     }
  511 
  512     if (settings.drop_privileges) {
  513         drop_worker_privileges();
  514     }
  515 
  516     register_thread_initialized();
  517 
  518     event_base_loop(me->base, 0);
  519 
  520     // same mechanism used to watch for all threads exiting.
  521     register_thread_initialized();
  522 
  523     event_base_free(me->base);
  524     return NULL;
  525 }
  526 
  527 
  528 /*
  529  * Processes an incoming "connection event" item. This is called when
  530  * input arrives on the libevent wakeup pipe.
  531  */
  532 // Syscalls can be expensive enough that handling a few of them once here can
  533 // save both throughput and overall latency.
  534 #define MAX_PIPE_EVENTS 32
  535 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
  536     LIBEVENT_THREAD *me = arg;
  537     CQ_ITEM *item;
  538     conn *c;
  539     uint64_t ev_count = 0; // max number of events to loop through this run.
  540 #ifdef HAVE_EVENTFD
  541     // NOTE: unlike pipe we aren't limiting the number of events per read.
  542     // However we do limit the number of queue pulls to what the count was at
  543     // the time of this function firing.
  544     if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {
  545         if (settings.verbose > 0)
  546             fprintf(stderr, "Can't read from libevent pipe\n");
  547         return;
  548     }
  549 #else
  550     char buf[MAX_PIPE_EVENTS];
  551 
  552     ev_count = read(fd, buf, MAX_PIPE_EVENTS);
  553     if (ev_count == 0) {
  554         if (settings.verbose > 0)
  555             fprintf(stderr, "Can't read from libevent pipe\n");
  556         return;
  557     }
  558 #endif
  559 
  560     for (int x = 0; x < ev_count; x++) {
  561         item = cq_pop(me->ev_queue);
  562         if (item == NULL) {
  563             return;
  564         }
  565 
  566         switch (item->mode) {
  567             case queue_new_conn:
  568                 c = conn_new(item->sfd, item->init_state, item->event_flags,
  569                                    item->read_buffer_size, item->transport,
  570                                    me->base, item->ssl);
  571                 if (c == NULL) {
  572                     if (IS_UDP(item->transport)) {
  573                         fprintf(stderr, "Can't listen for events on UDP socket\n");
  574                         exit(1);
  575                     } else {
  576                         if (settings.verbose > 0) {
  577                             fprintf(stderr, "Can't listen for events on fd %d\n",
  578                                 item->sfd);
  579                         }
  580 #ifdef TLS
  581                         if (item->ssl) {
  582                             SSL_shutdown(item->ssl);
  583                             SSL_free(item->ssl);
  584                         }
  585 #endif
  586                         close(item->sfd);
  587                     }
  588                 } else {
  589                     c->thread = me;
  590                     conn_io_queue_setup(c);
  591 #ifdef TLS
  592                     if (settings.ssl_enabled && c->ssl != NULL) {
  593                         assert(c->thread && c->thread->ssl_wbuf);
  594                         c->ssl_wbuf = c->thread->ssl_wbuf;
  595                     }
  596 #endif
  597                 }
  598                 break;
  599             case queue_pause:
  600                 /* we were told to pause and report in */
  601                 register_thread_initialized();
  602                 break;
  603             case queue_timeout:
  604                 /* a client socket timed out */
  605                 conn_close_idle(conns[item->sfd]);
  606                 break;
  607             case queue_redispatch:
  608                 /* a side thread redispatched a client connection */
  609                 conn_worker_readd(conns[item->sfd]);
  610                 break;
  611             case queue_stop:
  612                 /* asked to stop */
  613                 event_base_loopexit(me->base, NULL);
  614                 break;
  615             case queue_return_io:
  616                 /* getting an individual IO object back */
  617                 conn_io_queue_return(item->io);
  618                 break;
  619 #ifdef PROXY
  620             case queue_proxy_reload:
  621                 proxy_worker_reload(settings.proxy_ctx, me);
  622                 break;
  623 #endif
  624         }
  625 
  626         cqi_free(me->ev_queue, item);
  627     }
  628 }
  629 
  630 // NOTE: need better encapsulation.
  631 // used by the proxy module to iterate the worker threads.
  632 LIBEVENT_THREAD *get_worker_thread(int id) {
  633     return &threads[id];
  634 }
  635 
  636 /* Which thread we assigned a connection to most recently. */
  637 static int last_thread = -1;
  638 
  639 /* Last thread we assigned to a connection based on napi_id */
  640 static int last_thread_by_napi_id = -1;
  641 
  642 static LIBEVENT_THREAD *select_thread_round_robin(void)
  643 {
  644     int tid = (last_thread + 1) % settings.num_threads;
  645 
  646     last_thread = tid;
  647 
  648     return threads + tid;
  649 }
  650 
  651 static void reset_threads_napi_id(void)
  652 {
  653     LIBEVENT_THREAD *thread;
  654     int i;
  655 
  656     for (i = 0; i < settings.num_threads; i++) {
  657          thread = threads + i;
  658          thread->napi_id = 0;
  659     }
  660 
  661     last_thread_by_napi_id = -1;
  662 }
  663 
  664 /* Select a worker thread based on the NAPI ID of an incoming connection
  665  * request. NAPI ID is a globally unique ID that identifies a NIC RX queue
  666  * on which a flow is received.
  667  */
  668 static LIBEVENT_THREAD *select_thread_by_napi_id(int sfd)
  669 {
  670     LIBEVENT_THREAD *thread;
  671     int napi_id, err, i;
  672     socklen_t len;
  673     int tid = -1;
  674 
  675     len = sizeof(socklen_t);
  676     err = getsockopt(sfd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len);
  677     if ((err == -1) || (napi_id == 0)) {
  678         STATS_LOCK();
  679         stats.round_robin_fallback++;
  680         STATS_UNLOCK();
  681         return select_thread_round_robin();
  682     }
  683 
  684 select:
  685     for (i = 0; i < settings.num_threads; i++) {
  686          thread = threads + i;
  687          if (last_thread_by_napi_id < i) {
  688              thread->napi_id = napi_id;
  689              last_thread_by_napi_id = i;
  690              tid = i;
  691              break;
  692          }
  693          if (thread->napi_id == napi_id) {
  694              tid = i;
  695              break;
  696          }
  697     }
  698 
  699     if (tid == -1) {
  700         STATS_LOCK();
  701         stats.unexpected_napi_ids++;
  702         STATS_UNLOCK();
  703         reset_threads_napi_id();
  704         goto select;
  705     }
  706 
  707     return threads + tid;
  708 }
  709 
  710 /*
  711  * Dispatches a new connection to another thread. This is only ever called
  712  * from the main thread, either during initialization (for UDP) or because
  713  * of an incoming connection.
  714  */
  715 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
  716                        int read_buffer_size, enum network_transport transport, void *ssl) {
  717     CQ_ITEM *item = NULL;
  718     LIBEVENT_THREAD *thread;
  719 
  720     if (!settings.num_napi_ids)
  721         thread = select_thread_round_robin();
  722     else
  723         thread = select_thread_by_napi_id(sfd);
  724 
  725     item = cqi_new(thread->ev_queue);
  726     if (item == NULL) {
  727         close(sfd);
  728         /* given that malloc failed this may also fail, but let's try */
  729         fprintf(stderr, "Failed to allocate memory for connection object\n");
  730         return;
  731     }
  732 
  733     item->sfd = sfd;
  734     item->init_state = init_state;
  735     item->event_flags = event_flags;
  736     item->read_buffer_size = read_buffer_size;
  737     item->transport = transport;
  738     item->mode = queue_new_conn;
  739     item->ssl = ssl;
  740 
  741     MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
  742     notify_worker(thread, item);
  743 }
  744 
  745 /*
  746  * Re-dispatches a connection back to the original thread. Can be called from
  747  * any side thread borrowing a connection.
  748  */
  749 void redispatch_conn(conn *c) {
  750     notify_worker_fd(c->thread, c->sfd, queue_redispatch);
  751 }
  752 
  753 void timeout_conn(conn *c) {
  754     notify_worker_fd(c->thread, c->sfd, queue_timeout);
  755 }
  756 #ifdef PROXY
  757 void proxy_reload_notify(LIBEVENT_THREAD *t) {
  758     notify_worker_fd(t, 0, queue_proxy_reload);
  759 }
  760 #endif
  761 
  762 void return_io_pending(io_pending_t *io) {
  763     CQ_ITEM *item = cqi_new(io->thread->ev_queue);
  764     if (item == NULL) {
  765         // TODO: how can we avoid this?
  766         // In the main case I just loop, since a malloc failure here for a
  767         // tiny object that's generally in a fixed size queue is going to
  768         // implode shortly.
  769         return;
  770     }
  771 
  772     item->mode = queue_return_io;
  773     item->io = io;
  774 
  775     notify_worker(io->thread, item);
  776 }
  777 
  778 /* This misses the allow_new_conns flag :( */
  779 void sidethread_conn_close(conn *c) {
  780     if (settings.verbose > 1)
  781         fprintf(stderr, "<%d connection closing from side thread.\n", c->sfd);
  782 
  783     c->state = conn_closing;
  784     // redispatch will see closing flag and properly close connection.
  785     redispatch_conn(c);
  786     return;
  787 }
  788 
  789 /********************************* ITEM ACCESS *******************************/
  790 
  791 /*
  792  * Allocates a new item.
  793  */
  794 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
  795     item *it;
  796     /* do_item_alloc handles its own locks */
  797     it = do_item_alloc(key, nkey, flags, exptime, nbytes);
  798     return it;
  799 }
  800 
  801 /*
  802  * Returns an item if it hasn't been marked as expired,
  803  * lazy-expiring as needed.
  804  */
  805 item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
  806     item *it;
  807     uint32_t hv;
  808     hv = hash(key, nkey);
  809     item_lock(hv);
  810     it = do_item_get(key, nkey, hv, c, do_update);
  811     item_unlock(hv);
  812     return it;
  813 }
  814 
  815 // returns an item with the item lock held.
  816 // lock will still be held even if return is NULL, allowing caller to replace
  817 // an item atomically if desired.
  818 item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv) {
  819     item *it;
  820     *hv = hash(key, nkey);
  821     item_lock(*hv);
  822     it = do_item_get(key, nkey, *hv, c, do_update);
  823     return it;
  824 }
  825 
  826 item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
  827     item *it;
  828     uint32_t hv;
  829     hv = hash(key, nkey);
  830     item_lock(hv);
  831     it = do_item_touch(key, nkey, exptime, hv, c);
  832     item_unlock(hv);
  833     return it;
  834 }
  835 
  836 /*
  837  * Links an item into the LRU and hashtable.
  838  */
  839 int item_link(item *item) {
  840     int ret;
  841     uint32_t hv;
  842 
  843     hv = hash(ITEM_key(item), item->nkey);
  844     item_lock(hv);
  845     ret = do_item_link(item, hv);
  846     item_unlock(hv);
  847     return ret;
  848 }
  849 
  850 /*
  851  * Decrements the reference count on an item and adds it to the freelist if
  852  * needed.
  853  */
  854 void item_remove(item *item) {
  855     uint32_t hv;
  856     hv = hash(ITEM_key(item), item->nkey);
  857 
  858     item_lock(hv);
  859     do_item_remove(item);
  860     item_unlock(hv);
  861 }
  862 
  863 /*
  864  * Replaces one item with another in the hashtable.
  865  * Unprotected by a mutex lock since the core server does not require
  866  * it to be thread-safe.
  867  */
  868 int item_replace(item *old_it, item *new_it, const uint32_t hv) {
  869     return do_item_replace(old_it, new_it, hv);
  870 }
  871 
  872 /*
  873  * Unlinks an item from the LRU and hashtable.
  874  */
  875 void item_unlink(item *item) {
  876     uint32_t hv;
  877     hv = hash(ITEM_key(item), item->nkey);
  878     item_lock(hv);
  879     do_item_unlink(item, hv);
  880     item_unlock(hv);
  881 }
  882 
  883 /*
  884  * Does arithmetic on a numeric item value.
  885  */
  886 enum delta_result_type add_delta(conn *c, const char *key,
  887                                  const size_t nkey, bool incr,
  888                                  const int64_t delta, char *buf,
  889                                  uint64_t *cas) {
  890     enum delta_result_type ret;
  891     uint32_t hv;
  892 
  893     hv = hash(key, nkey);
  894     item_lock(hv);
  895     ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv, NULL);
  896     item_unlock(hv);
  897     return ret;
  898 }
  899 
  900 /*
  901  * Stores an item in the cache (high level, obeys set/add/replace semantics)
  902  */
  903 enum store_item_type store_item(item *item, int comm, conn* c) {
  904     enum store_item_type ret;
  905     uint32_t hv;
  906 
  907     hv = hash(ITEM_key(item), item->nkey);
  908     item_lock(hv);
  909     ret = do_store_item(item, comm, c, hv);
  910     item_unlock(hv);
  911     return ret;
  912 }
  913 
  914 /******************************* GLOBAL STATS ******************************/
  915 
  916 void STATS_LOCK() {
  917     pthread_mutex_lock(&stats_lock);
  918 }
  919 
  920 void STATS_UNLOCK() {
  921     pthread_mutex_unlock(&stats_lock);
  922 }
  923 
  924 void threadlocal_stats_reset(void) {
  925     int ii;
  926     for (ii = 0; ii < settings.num_threads; ++ii) {
  927         pthread_mutex_lock(&threads[ii].stats.mutex);
  928 #define X(name) threads[ii].stats.name = 0;
  929         THREAD_STATS_FIELDS
  930 #ifdef EXTSTORE
  931         EXTSTORE_THREAD_STATS_FIELDS
  932 #endif
  933 #ifdef PROXY
  934         PROXY_THREAD_STATS_FIELDS
  935 #endif
  936 #undef X
  937 
  938         memset(&threads[ii].stats.slab_stats, 0,
  939                 sizeof(threads[ii].stats.slab_stats));
  940         memset(&threads[ii].stats.lru_hits, 0,
  941                 sizeof(uint64_t) * POWER_LARGEST);
  942 
  943         pthread_mutex_unlock(&threads[ii].stats.mutex);
  944     }
  945 }
  946 
  947 void threadlocal_stats_aggregate(struct thread_stats *stats) {
  948     int ii, sid;
  949 
  950     /* The struct has a mutex, but we can safely set the whole thing
  951      * to zero since it is unused when aggregating. */
  952     memset(stats, 0, sizeof(*stats));
  953 
  954     for (ii = 0; ii < settings.num_threads; ++ii) {
  955         pthread_mutex_lock(&threads[ii].stats.mutex);
  956 #define X(name) stats->name += threads[ii].stats.name;
  957         THREAD_STATS_FIELDS
  958 #ifdef EXTSTORE
  959         EXTSTORE_THREAD_STATS_FIELDS
  960 #endif
  961 #ifdef PROXY
  962         PROXY_THREAD_STATS_FIELDS
  963 #endif
  964 #undef X
  965 
  966         for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
  967 #define X(name) stats->slab_stats[sid].name += \
  968             threads[ii].stats.slab_stats[sid].name;
  969             SLAB_STATS_FIELDS
  970 #undef X
  971         }
  972 
  973         for (sid = 0; sid < POWER_LARGEST; sid++) {
  974             stats->lru_hits[sid] +=
  975                 threads[ii].stats.lru_hits[sid];
  976             stats->slab_stats[CLEAR_LRU(sid)].get_hits +=
  977                 threads[ii].stats.lru_hits[sid];
  978         }
  979 
  980         stats->read_buf_count += threads[ii].rbuf_cache->total;
  981         stats->read_buf_bytes += threads[ii].rbuf_cache->total * READ_BUFFER_SIZE;
  982         stats->read_buf_bytes_free += threads[ii].rbuf_cache->freecurr * READ_BUFFER_SIZE;
  983         pthread_mutex_unlock(&threads[ii].stats.mutex);
  984     }
  985 }
  986 
  987 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
  988     int sid;
  989 
  990     memset(out, 0, sizeof(*out));
  991 
  992     for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
  993 #define X(name) out->name += stats->slab_stats[sid].name;
  994         SLAB_STATS_FIELDS
  995 #undef X
  996     }
  997 }
  998 
  999 /*
 1000  * Initializes the thread subsystem, creating various worker threads.
 1001  *
 1002  * nthreads  Number of worker event handler threads to spawn
 1003  */
 1004 void memcached_thread_init(int nthreads, void *arg) {
 1005     int         i;
 1006     int         power;
 1007 
 1008     for (i = 0; i < POWER_LARGEST; i++) {
 1009         pthread_mutex_init(&lru_locks[i], NULL);
 1010     }
 1011     pthread_mutex_init(&worker_hang_lock, NULL);
 1012 
 1013     pthread_mutex_init(&init_lock, NULL);
 1014     pthread_cond_init(&init_cond, NULL);
 1015 
 1016     /* Want a wide lock table, but don't waste memory */
 1017     if (nthreads < 3) {
 1018         power = 10;
 1019     } else if (nthreads < 4) {
 1020         power = 11;
 1021     } else if (nthreads < 5) {
 1022         power = 12;
 1023     } else if (nthreads <= 10) {
 1024         power = 13;
 1025     } else if (nthreads <= 20) {
 1026         power = 14;
 1027     } else {
 1028         /* 32k buckets. just under the hashpower default. */
 1029         power = 15;
 1030     }
 1031 
 1032     if (power >= hashpower) {
 1033         fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
 1034         fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
 1035         fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
 1036         exit(1);
 1037     }
 1038 
 1039     item_lock_count = hashsize(power);
 1040     item_lock_hashpower = power;
 1041 
 1042     item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
 1043     if (! item_locks) {
 1044         perror("Can't allocate item locks");
 1045         exit(1);
 1046     }
 1047     for (i = 0; i < item_lock_count; i++) {
 1048         pthread_mutex_init(&item_locks[i], NULL);
 1049     }
 1050 
 1051     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
 1052     if (! threads) {
 1053         perror("Can't allocate thread descriptors");
 1054         exit(1);
 1055     }
 1056 
 1057     for (i = 0; i < nthreads; i++) {
 1058 #ifdef HAVE_EVENTFD
 1059         threads[i].notify_event_fd = eventfd(0, EFD_NONBLOCK);
 1060         if (threads[i].notify_event_fd == -1) {
 1061             perror("failed creating eventfd for worker thread");
 1062             exit(1);
 1063         }
 1064 #else
 1065         int fds[2];
 1066         if (pipe(fds)) {
 1067             perror("Can't create notify pipe");
 1068             exit(1);
 1069         }
 1070 
 1071         threads[i].notify_receive_fd = fds[0];
 1072         threads[i].notify_send_fd = fds[1];
 1073 #endif
 1074 #ifdef EXTSTORE
 1075         threads[i].storage = arg;
 1076 #endif
 1077         setup_thread(&threads[i]);
 1078         /* Reserve three fds for the libevent base, and two for the pipe */
 1079         stats_state.reserved_fds += 5;
 1080     }
 1081 
 1082     /* Create threads after we've done all the libevent setup. */
 1083     for (i = 0; i < nthreads; i++) {
 1084         create_worker(worker_libevent, &threads[i]);
 1085     }
 1086 
 1087     /* Wait for all the threads to set themselves up before returning. */
 1088     pthread_mutex_lock(&init_lock);
 1089     wait_for_thread_registration(nthreads);
 1090     pthread_mutex_unlock(&init_lock);
 1091 }
 1092