"Fossies" - the Fresh Open Source Software Archive 
Member "memcached-1.6.15/thread.c" (21 Feb 2022, 32047 Bytes) of package /linux/www/memcached-1.6.15.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "thread.c" see the
Fossies "Dox" file reference documentation and the last
Fossies "Diffs" side-by-side code changes report:
1.6.12_vs_1.6.13.
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Thread management for memcached.
4 */
5 #include "memcached.h"
6 #ifdef EXTSTORE
7 #include "storage.h"
8 #endif
9 #ifdef HAVE_EVENTFD
10 #include <sys/eventfd.h>
11 #endif
12 #ifdef PROXY
13 #include "proto_proxy.h"
14 #endif
15 #include <assert.h>
16 #include <stdio.h>
17 #include <errno.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <pthread.h>
21
22 #include "queue.h"
23
24 #ifdef __sun
25 #include <atomic.h>
26 #endif
27
28 #ifdef TLS
29 #include <openssl/ssl.h>
30 #endif
31
32 #define ITEMS_PER_ALLOC 64
33
34 /* An item in the connection queue. */
35 enum conn_queue_item_modes {
36 queue_new_conn, /* brand new connection. */
37 queue_pause, /* pause thread */
38 queue_timeout, /* socket sfd timed out */
39 queue_redispatch, /* return conn from side thread */
40 queue_stop, /* exit thread */
41 queue_return_io, /* returning a pending IO object immediately */
42 #ifdef PROXY
43 queue_proxy_reload, /* signal proxy to reload worker VM */
44 #endif
45 };
46 typedef struct conn_queue_item CQ_ITEM;
47 struct conn_queue_item {
48 int sfd;
49 enum conn_states init_state;
50 int event_flags;
51 int read_buffer_size;
52 enum network_transport transport;
53 enum conn_queue_item_modes mode;
54 conn *c;
55 void *ssl;
56 io_pending_t *io; // IO when used for deferred IO handling.
57 STAILQ_ENTRY(conn_queue_item) i_next;
58 };
59
60 /* A connection queue. */
61 typedef struct conn_queue CQ;
62 struct conn_queue {
63 STAILQ_HEAD(conn_ev_head, conn_queue_item) head;
64 pthread_mutex_t lock;
65 cache_t *cache; /* freelisted objects */
66 };
67
68 /* Locks for cache LRU operations */
69 pthread_mutex_t lru_locks[POWER_LARGEST];
70
71 /* Connection lock around accepting new connections */
72 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
73
74 #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
75 pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
76 #endif
77
78 /* Lock for global stats */
79 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
80
81 /* Lock to cause worker threads to hang up after being woken */
82 static pthread_mutex_t worker_hang_lock;
83
84 static pthread_mutex_t *item_locks;
85 /* size of the item lock hash table */
86 static uint32_t item_lock_count;
87 unsigned int item_lock_hashpower;
88 #define hashsize(n) ((unsigned long int)1<<(n))
89 #define hashmask(n) (hashsize(n)-1)
90
91 /*
92 * Each libevent instance has a wakeup pipe, which other threads
93 * can use to signal that they've put a new connection on its queue.
94 */
95 static LIBEVENT_THREAD *threads;
96
97 /*
98 * Number of worker threads that have finished setting themselves up.
99 */
100 static int init_count = 0;
101 static pthread_mutex_t init_lock;
102 static pthread_cond_t init_cond;
103
104 static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item);
105 static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode);
106 static CQ_ITEM *cqi_new(CQ *cq);
107 static void cq_push(CQ *cq, CQ_ITEM *item);
108
109 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg);
110
111 /* item_lock() must be held for an item before any modifications to either its
112 * associated hash bucket, or the structure itself.
113 * LRU modifications must hold the item lock, and the LRU lock.
114 * LRU's accessing items must item_trylock() before modifying an item.
115 * Items accessible from an LRU must not be freed or modified
116 * without first locking and removing from the LRU.
117 */
118
119 void item_lock(uint32_t hv) {
120 mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
121 }
122
123 void *item_trylock(uint32_t hv) {
124 pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
125 if (pthread_mutex_trylock(lock) == 0) {
126 return lock;
127 }
128 return NULL;
129 }
130
131 void item_trylock_unlock(void *lock) {
132 mutex_unlock((pthread_mutex_t *) lock);
133 }
134
135 void item_unlock(uint32_t hv) {
136 mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
137 }
138
139 static void wait_for_thread_registration(int nthreads) {
140 while (init_count < nthreads) {
141 pthread_cond_wait(&init_cond, &init_lock);
142 }
143 }
144
145 static void register_thread_initialized(void) {
146 pthread_mutex_lock(&init_lock);
147 init_count++;
148 pthread_cond_signal(&init_cond);
149 pthread_mutex_unlock(&init_lock);
150 /* Force worker threads to pile up if someone wants us to */
151 pthread_mutex_lock(&worker_hang_lock);
152 pthread_mutex_unlock(&worker_hang_lock);
153 }
154
155 /* Must not be called with any deeper locks held */
156 void pause_threads(enum pause_thread_types type) {
157 int i;
158 bool pause_workers = false;
159
160 switch (type) {
161 case PAUSE_ALL_THREADS:
162 slabs_rebalancer_pause();
163 lru_maintainer_pause();
164 lru_crawler_pause();
165 #ifdef EXTSTORE
166 storage_compact_pause();
167 storage_write_pause();
168 #endif
169 case PAUSE_WORKER_THREADS:
170 pause_workers = true;
171 pthread_mutex_lock(&worker_hang_lock);
172 break;
173 case RESUME_ALL_THREADS:
174 slabs_rebalancer_resume();
175 lru_maintainer_resume();
176 lru_crawler_resume();
177 #ifdef EXTSTORE
178 storage_compact_resume();
179 storage_write_resume();
180 #endif
181 case RESUME_WORKER_THREADS:
182 pthread_mutex_unlock(&worker_hang_lock);
183 break;
184 default:
185 fprintf(stderr, "Unknown lock type: %d\n", type);
186 assert(1 == 0);
187 break;
188 }
189
190 /* Only send a message if we have one. */
191 if (!pause_workers) {
192 return;
193 }
194
195 pthread_mutex_lock(&init_lock);
196 init_count = 0;
197 for (i = 0; i < settings.num_threads; i++) {
198 notify_worker_fd(&threads[i], 0, queue_pause);
199 }
200 wait_for_thread_registration(settings.num_threads);
201 pthread_mutex_unlock(&init_lock);
202 }
203
204 // MUST not be called with any deeper locks held
205 // MUST be called only by parent thread
206 // Note: listener thread is the "main" event base, which has exited its
207 // loop in order to call this function.
208 void stop_threads(void) {
209 int i;
210
211 // assoc can call pause_threads(), so we have to stop it first.
212 stop_assoc_maintenance_thread();
213 if (settings.verbose > 0)
214 fprintf(stderr, "stopped assoc\n");
215
216 if (settings.verbose > 0)
217 fprintf(stderr, "asking workers to stop\n");
218
219 pthread_mutex_lock(&worker_hang_lock);
220 pthread_mutex_lock(&init_lock);
221 init_count = 0;
222 for (i = 0; i < settings.num_threads; i++) {
223 notify_worker_fd(&threads[i], 0, queue_stop);
224 }
225 wait_for_thread_registration(settings.num_threads);
226 pthread_mutex_unlock(&init_lock);
227
228 // All of the workers are hung but haven't done cleanup yet.
229
230 if (settings.verbose > 0)
231 fprintf(stderr, "asking background threads to stop\n");
232
233 // stop each side thread.
234 // TODO: Verify these all work if the threads are already stopped
235 stop_item_crawler_thread(CRAWLER_WAIT);
236 if (settings.verbose > 0)
237 fprintf(stderr, "stopped lru crawler\n");
238 if (settings.lru_maintainer_thread) {
239 stop_lru_maintainer_thread();
240 if (settings.verbose > 0)
241 fprintf(stderr, "stopped maintainer\n");
242 }
243 if (settings.slab_reassign) {
244 stop_slab_maintenance_thread();
245 if (settings.verbose > 0)
246 fprintf(stderr, "stopped slab mover\n");
247 }
248 logger_stop();
249 if (settings.verbose > 0)
250 fprintf(stderr, "stopped logger thread\n");
251 stop_conn_timeout_thread();
252 if (settings.verbose > 0)
253 fprintf(stderr, "stopped idle timeout thread\n");
254
255 // Close all connections then let the workers finally exit.
256 if (settings.verbose > 0)
257 fprintf(stderr, "closing connections\n");
258 conn_close_all();
259 pthread_mutex_unlock(&worker_hang_lock);
260 if (settings.verbose > 0)
261 fprintf(stderr, "reaping worker threads\n");
262 for (i = 0; i < settings.num_threads; i++) {
263 pthread_join(threads[i].thread_id, NULL);
264 }
265
266 if (settings.verbose > 0)
267 fprintf(stderr, "all background threads stopped\n");
268
269 // At this point, every background thread must be stopped.
270 }
271
272 /*
273 * Initializes a connection queue.
274 */
275 static void cq_init(CQ *cq) {
276 pthread_mutex_init(&cq->lock, NULL);
277 STAILQ_INIT(&cq->head);
278 cq->cache = cache_create("cq", sizeof(CQ_ITEM), sizeof(char *));
279 if (cq->cache == NULL) {
280 fprintf(stderr, "Failed to create connection queue cache\n");
281 exit(EXIT_FAILURE);
282 }
283 }
284
285 /*
286 * Looks for an item on a connection queue, but doesn't block if there isn't
287 * one.
288 * Returns the item, or NULL if no item is available
289 */
290 static CQ_ITEM *cq_pop(CQ *cq) {
291 CQ_ITEM *item;
292
293 pthread_mutex_lock(&cq->lock);
294 item = STAILQ_FIRST(&cq->head);
295 if (item != NULL) {
296 STAILQ_REMOVE_HEAD(&cq->head, i_next);
297 }
298 pthread_mutex_unlock(&cq->lock);
299
300 return item;
301 }
302
303 /*
304 * Adds an item to a connection queue.
305 */
306 static void cq_push(CQ *cq, CQ_ITEM *item) {
307 pthread_mutex_lock(&cq->lock);
308 STAILQ_INSERT_TAIL(&cq->head, item, i_next);
309 pthread_mutex_unlock(&cq->lock);
310 }
311
312 /*
313 * Returns a fresh connection queue item.
314 */
315 static CQ_ITEM *cqi_new(CQ *cq) {
316 CQ_ITEM *item = cache_alloc(cq->cache);
317 if (item == NULL) {
318 STATS_LOCK();
319 stats.malloc_fails++;
320 STATS_UNLOCK();
321 }
322 return item;
323 }
324
325 /*
326 * Frees a connection queue item (adds it to the freelist.)
327 */
328 static void cqi_free(CQ *cq, CQ_ITEM *item) {
329 cache_free(cq->cache, item);
330 }
331
332 // TODO: Skip notify if queue wasn't empty?
333 // - Requires cq_push() returning a "was empty" flag
334 // - Requires event handling loop to pop the entire queue and work from that
335 // instead of the ev_count work there now.
336 // In testing this does result in a large performance uptick, but unclear how
337 // much that will transfer from a synthetic benchmark.
338 static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {
339 cq_push(t->ev_queue, item);
340 #ifdef HAVE_EVENTFD
341 uint64_t u = 1;
342 if (write(t->notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
343 perror("failed writing to worker eventfd");
344 /* TODO: This is a fatal problem. Can it ever happen temporarily? */
345 }
346 #else
347 char buf[1] = "c";
348 if (write(t->notify_send_fd, buf, 1) != 1) {
349 perror("Failed writing to notify pipe");
350 /* TODO: This is a fatal problem. Can it ever happen temporarily? */
351 }
352 #endif
353 }
354
355 // NOTE: An external func that takes a conn *c might be cleaner overall.
356 static void notify_worker_fd(LIBEVENT_THREAD *t, int sfd, enum conn_queue_item_modes mode) {
357 CQ_ITEM *item;
358 while ( (item = cqi_new(t->ev_queue)) == NULL ) {
359 // NOTE: most callers of this function cannot fail, but mallocs in
360 // theory can fail. Small mallocs essentially never do without also
361 // killing the process. Syscalls can also fail but the original code
362 // never handled this either.
363 // As a compromise, I'm leaving this note and this loop: This alloc
364 // cannot fail, but pre-allocating the data is too much code in an
365 // area I want to keep more lean. If this CQ business becomes a more
366 // generic queue I'll reconsider.
367 }
368
369 item->mode = mode;
370 item->sfd = sfd;
371 notify_worker(t, item);
372 }
373
374 /*
375 * Creates a worker thread.
376 */
377 static void create_worker(void *(*func)(void *), void *arg) {
378 pthread_attr_t attr;
379 int ret;
380
381 pthread_attr_init(&attr);
382
383 if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
384 fprintf(stderr, "Can't create thread: %s\n",
385 strerror(ret));
386 exit(1);
387 }
388 }
389
390 /*
391 * Sets whether or not we accept new connections.
392 */
393 void accept_new_conns(const bool do_accept) {
394 pthread_mutex_lock(&conn_lock);
395 do_accept_new_conns(do_accept);
396 pthread_mutex_unlock(&conn_lock);
397 }
398 /****************************** LIBEVENT THREADS *****************************/
399
400 /*
401 * Set up a thread's information.
402 */
403 static void setup_thread(LIBEVENT_THREAD *me) {
404 #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
405 struct event_config *ev_config;
406 ev_config = event_config_new();
407 event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
408 me->base = event_base_new_with_config(ev_config);
409 event_config_free(ev_config);
410 #else
411 me->base = event_init();
412 #endif
413
414 if (! me->base) {
415 fprintf(stderr, "Can't allocate event base\n");
416 exit(1);
417 }
418
419 /* Listen for notifications from other threads */
420 #ifdef HAVE_EVENTFD
421 event_set(&me->notify_event, me->notify_event_fd,
422 EV_READ | EV_PERSIST, thread_libevent_process, me);
423 #else
424 event_set(&me->notify_event, me->notify_receive_fd,
425 EV_READ | EV_PERSIST, thread_libevent_process, me);
426 #endif
427 event_base_set(me->base, &me->notify_event);
428
429 if (event_add(&me->notify_event, 0) == -1) {
430 fprintf(stderr, "Can't monitor libevent notify pipe\n");
431 exit(1);
432 }
433
434 me->ev_queue = malloc(sizeof(struct conn_queue));
435 if (me->ev_queue == NULL) {
436 perror("Failed to allocate memory for connection queue");
437 exit(EXIT_FAILURE);
438 }
439 cq_init(me->ev_queue);
440
441 if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
442 perror("Failed to initialize mutex");
443 exit(EXIT_FAILURE);
444 }
445
446 me->rbuf_cache = cache_create("rbuf", READ_BUFFER_SIZE, sizeof(char *));
447 if (me->rbuf_cache == NULL) {
448 fprintf(stderr, "Failed to create read buffer cache\n");
449 exit(EXIT_FAILURE);
450 }
451 // Note: we were cleanly passing in num_threads before, but this now
452 // relies on settings globals too much.
453 if (settings.read_buf_mem_limit) {
454 int limit = settings.read_buf_mem_limit / settings.num_threads;
455 if (limit < READ_BUFFER_SIZE) {
456 limit = 1;
457 } else {
458 limit = limit / READ_BUFFER_SIZE;
459 }
460 cache_set_limit(me->rbuf_cache, limit);
461 }
462
463 me->io_cache = cache_create("io", sizeof(io_pending_t), sizeof(char*));
464 if (me->io_cache == NULL) {
465 fprintf(stderr, "Failed to create IO object cache\n");
466 exit(EXIT_FAILURE);
467 }
468 #ifdef TLS
469 if (settings.ssl_enabled) {
470 me->ssl_wbuf = (char *)malloc((size_t)settings.ssl_wbuf_size);
471 if (me->ssl_wbuf == NULL) {
472 fprintf(stderr, "Failed to allocate the SSL write buffer\n");
473 exit(EXIT_FAILURE);
474 }
475 }
476 #endif
477 #ifdef EXTSTORE
478 // me->storage is set just before this function is called.
479 if (me->storage) {
480 thread_io_queue_add(me, IO_QUEUE_EXTSTORE, me->storage,
481 storage_submit_cb, storage_complete_cb, NULL, storage_finalize_cb);
482 }
483 #endif
484 #ifdef PROXY
485 thread_io_queue_add(me, IO_QUEUE_PROXY, settings.proxy_ctx, proxy_submit_cb,
486 proxy_complete_cb, proxy_return_cb, proxy_finalize_cb);
487
488 // TODO: maybe register hooks to be called here from sub-packages? ie;
489 // extstore, TLS, proxy.
490 if (settings.proxy_enabled) {
491 proxy_thread_init(me);
492 }
493 #endif
494 thread_io_queue_add(me, IO_QUEUE_NONE, NULL, NULL, NULL, NULL, NULL);
495 }
496
497 /*
498 * Worker thread: main event loop
499 */
500 static void *worker_libevent(void *arg) {
501 LIBEVENT_THREAD *me = arg;
502
503 /* Any per-thread setup can happen here; memcached_thread_init() will block until
504 * all threads have finished initializing.
505 */
506 me->l = logger_create();
507 me->lru_bump_buf = item_lru_bump_buf_create();
508 if (me->l == NULL || me->lru_bump_buf == NULL) {
509 abort();
510 }
511
512 if (settings.drop_privileges) {
513 drop_worker_privileges();
514 }
515
516 register_thread_initialized();
517
518 event_base_loop(me->base, 0);
519
520 // same mechanism used to watch for all threads exiting.
521 register_thread_initialized();
522
523 event_base_free(me->base);
524 return NULL;
525 }
526
527
528 /*
529 * Processes an incoming "connection event" item. This is called when
530 * input arrives on the libevent wakeup pipe.
531 */
532 // Syscalls can be expensive enough that handling a few of them once here can
533 // save both throughput and overall latency.
534 #define MAX_PIPE_EVENTS 32
535 static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {
536 LIBEVENT_THREAD *me = arg;
537 CQ_ITEM *item;
538 conn *c;
539 uint64_t ev_count = 0; // max number of events to loop through this run.
540 #ifdef HAVE_EVENTFD
541 // NOTE: unlike pipe we aren't limiting the number of events per read.
542 // However we do limit the number of queue pulls to what the count was at
543 // the time of this function firing.
544 if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {
545 if (settings.verbose > 0)
546 fprintf(stderr, "Can't read from libevent pipe\n");
547 return;
548 }
549 #else
550 char buf[MAX_PIPE_EVENTS];
551
552 ev_count = read(fd, buf, MAX_PIPE_EVENTS);
553 if (ev_count == 0) {
554 if (settings.verbose > 0)
555 fprintf(stderr, "Can't read from libevent pipe\n");
556 return;
557 }
558 #endif
559
560 for (int x = 0; x < ev_count; x++) {
561 item = cq_pop(me->ev_queue);
562 if (item == NULL) {
563 return;
564 }
565
566 switch (item->mode) {
567 case queue_new_conn:
568 c = conn_new(item->sfd, item->init_state, item->event_flags,
569 item->read_buffer_size, item->transport,
570 me->base, item->ssl);
571 if (c == NULL) {
572 if (IS_UDP(item->transport)) {
573 fprintf(stderr, "Can't listen for events on UDP socket\n");
574 exit(1);
575 } else {
576 if (settings.verbose > 0) {
577 fprintf(stderr, "Can't listen for events on fd %d\n",
578 item->sfd);
579 }
580 #ifdef TLS
581 if (item->ssl) {
582 SSL_shutdown(item->ssl);
583 SSL_free(item->ssl);
584 }
585 #endif
586 close(item->sfd);
587 }
588 } else {
589 c->thread = me;
590 conn_io_queue_setup(c);
591 #ifdef TLS
592 if (settings.ssl_enabled && c->ssl != NULL) {
593 assert(c->thread && c->thread->ssl_wbuf);
594 c->ssl_wbuf = c->thread->ssl_wbuf;
595 }
596 #endif
597 }
598 break;
599 case queue_pause:
600 /* we were told to pause and report in */
601 register_thread_initialized();
602 break;
603 case queue_timeout:
604 /* a client socket timed out */
605 conn_close_idle(conns[item->sfd]);
606 break;
607 case queue_redispatch:
608 /* a side thread redispatched a client connection */
609 conn_worker_readd(conns[item->sfd]);
610 break;
611 case queue_stop:
612 /* asked to stop */
613 event_base_loopexit(me->base, NULL);
614 break;
615 case queue_return_io:
616 /* getting an individual IO object back */
617 conn_io_queue_return(item->io);
618 break;
619 #ifdef PROXY
620 case queue_proxy_reload:
621 proxy_worker_reload(settings.proxy_ctx, me);
622 break;
623 #endif
624 }
625
626 cqi_free(me->ev_queue, item);
627 }
628 }
629
630 // NOTE: need better encapsulation.
631 // used by the proxy module to iterate the worker threads.
632 LIBEVENT_THREAD *get_worker_thread(int id) {
633 return &threads[id];
634 }
635
636 /* Which thread we assigned a connection to most recently. */
637 static int last_thread = -1;
638
639 /* Last thread we assigned to a connection based on napi_id */
640 static int last_thread_by_napi_id = -1;
641
642 static LIBEVENT_THREAD *select_thread_round_robin(void)
643 {
644 int tid = (last_thread + 1) % settings.num_threads;
645
646 last_thread = tid;
647
648 return threads + tid;
649 }
650
651 static void reset_threads_napi_id(void)
652 {
653 LIBEVENT_THREAD *thread;
654 int i;
655
656 for (i = 0; i < settings.num_threads; i++) {
657 thread = threads + i;
658 thread->napi_id = 0;
659 }
660
661 last_thread_by_napi_id = -1;
662 }
663
664 /* Select a worker thread based on the NAPI ID of an incoming connection
665 * request. NAPI ID is a globally unique ID that identifies a NIC RX queue
666 * on which a flow is received.
667 */
668 static LIBEVENT_THREAD *select_thread_by_napi_id(int sfd)
669 {
670 LIBEVENT_THREAD *thread;
671 int napi_id, err, i;
672 socklen_t len;
673 int tid = -1;
674
675 len = sizeof(socklen_t);
676 err = getsockopt(sfd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len);
677 if ((err == -1) || (napi_id == 0)) {
678 STATS_LOCK();
679 stats.round_robin_fallback++;
680 STATS_UNLOCK();
681 return select_thread_round_robin();
682 }
683
684 select:
685 for (i = 0; i < settings.num_threads; i++) {
686 thread = threads + i;
687 if (last_thread_by_napi_id < i) {
688 thread->napi_id = napi_id;
689 last_thread_by_napi_id = i;
690 tid = i;
691 break;
692 }
693 if (thread->napi_id == napi_id) {
694 tid = i;
695 break;
696 }
697 }
698
699 if (tid == -1) {
700 STATS_LOCK();
701 stats.unexpected_napi_ids++;
702 STATS_UNLOCK();
703 reset_threads_napi_id();
704 goto select;
705 }
706
707 return threads + tid;
708 }
709
710 /*
711 * Dispatches a new connection to another thread. This is only ever called
712 * from the main thread, either during initialization (for UDP) or because
713 * of an incoming connection.
714 */
715 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
716 int read_buffer_size, enum network_transport transport, void *ssl) {
717 CQ_ITEM *item = NULL;
718 LIBEVENT_THREAD *thread;
719
720 if (!settings.num_napi_ids)
721 thread = select_thread_round_robin();
722 else
723 thread = select_thread_by_napi_id(sfd);
724
725 item = cqi_new(thread->ev_queue);
726 if (item == NULL) {
727 close(sfd);
728 /* given that malloc failed this may also fail, but let's try */
729 fprintf(stderr, "Failed to allocate memory for connection object\n");
730 return;
731 }
732
733 item->sfd = sfd;
734 item->init_state = init_state;
735 item->event_flags = event_flags;
736 item->read_buffer_size = read_buffer_size;
737 item->transport = transport;
738 item->mode = queue_new_conn;
739 item->ssl = ssl;
740
741 MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
742 notify_worker(thread, item);
743 }
744
745 /*
746 * Re-dispatches a connection back to the original thread. Can be called from
747 * any side thread borrowing a connection.
748 */
749 void redispatch_conn(conn *c) {
750 notify_worker_fd(c->thread, c->sfd, queue_redispatch);
751 }
752
753 void timeout_conn(conn *c) {
754 notify_worker_fd(c->thread, c->sfd, queue_timeout);
755 }
756 #ifdef PROXY
757 void proxy_reload_notify(LIBEVENT_THREAD *t) {
758 notify_worker_fd(t, 0, queue_proxy_reload);
759 }
760 #endif
761
762 void return_io_pending(io_pending_t *io) {
763 CQ_ITEM *item = cqi_new(io->thread->ev_queue);
764 if (item == NULL) {
765 // TODO: how can we avoid this?
766 // In the main case I just loop, since a malloc failure here for a
767 // tiny object that's generally in a fixed size queue is going to
768 // implode shortly.
769 return;
770 }
771
772 item->mode = queue_return_io;
773 item->io = io;
774
775 notify_worker(io->thread, item);
776 }
777
778 /* This misses the allow_new_conns flag :( */
779 void sidethread_conn_close(conn *c) {
780 if (settings.verbose > 1)
781 fprintf(stderr, "<%d connection closing from side thread.\n", c->sfd);
782
783 c->state = conn_closing;
784 // redispatch will see closing flag and properly close connection.
785 redispatch_conn(c);
786 return;
787 }
788
789 /********************************* ITEM ACCESS *******************************/
790
791 /*
792 * Allocates a new item.
793 */
794 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
795 item *it;
796 /* do_item_alloc handles its own locks */
797 it = do_item_alloc(key, nkey, flags, exptime, nbytes);
798 return it;
799 }
800
801 /*
802 * Returns an item if it hasn't been marked as expired,
803 * lazy-expiring as needed.
804 */
805 item *item_get(const char *key, const size_t nkey, conn *c, const bool do_update) {
806 item *it;
807 uint32_t hv;
808 hv = hash(key, nkey);
809 item_lock(hv);
810 it = do_item_get(key, nkey, hv, c, do_update);
811 item_unlock(hv);
812 return it;
813 }
814
815 // returns an item with the item lock held.
816 // lock will still be held even if return is NULL, allowing caller to replace
817 // an item atomically if desired.
818 item *item_get_locked(const char *key, const size_t nkey, conn *c, const bool do_update, uint32_t *hv) {
819 item *it;
820 *hv = hash(key, nkey);
821 item_lock(*hv);
822 it = do_item_get(key, nkey, *hv, c, do_update);
823 return it;
824 }
825
826 item *item_touch(const char *key, size_t nkey, uint32_t exptime, conn *c) {
827 item *it;
828 uint32_t hv;
829 hv = hash(key, nkey);
830 item_lock(hv);
831 it = do_item_touch(key, nkey, exptime, hv, c);
832 item_unlock(hv);
833 return it;
834 }
835
836 /*
837 * Links an item into the LRU and hashtable.
838 */
839 int item_link(item *item) {
840 int ret;
841 uint32_t hv;
842
843 hv = hash(ITEM_key(item), item->nkey);
844 item_lock(hv);
845 ret = do_item_link(item, hv);
846 item_unlock(hv);
847 return ret;
848 }
849
850 /*
851 * Decrements the reference count on an item and adds it to the freelist if
852 * needed.
853 */
854 void item_remove(item *item) {
855 uint32_t hv;
856 hv = hash(ITEM_key(item), item->nkey);
857
858 item_lock(hv);
859 do_item_remove(item);
860 item_unlock(hv);
861 }
862
863 /*
864 * Replaces one item with another in the hashtable.
865 * Unprotected by a mutex lock since the core server does not require
866 * it to be thread-safe.
867 */
868 int item_replace(item *old_it, item *new_it, const uint32_t hv) {
869 return do_item_replace(old_it, new_it, hv);
870 }
871
872 /*
873 * Unlinks an item from the LRU and hashtable.
874 */
875 void item_unlink(item *item) {
876 uint32_t hv;
877 hv = hash(ITEM_key(item), item->nkey);
878 item_lock(hv);
879 do_item_unlink(item, hv);
880 item_unlock(hv);
881 }
882
883 /*
884 * Does arithmetic on a numeric item value.
885 */
886 enum delta_result_type add_delta(conn *c, const char *key,
887 const size_t nkey, bool incr,
888 const int64_t delta, char *buf,
889 uint64_t *cas) {
890 enum delta_result_type ret;
891 uint32_t hv;
892
893 hv = hash(key, nkey);
894 item_lock(hv);
895 ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv, NULL);
896 item_unlock(hv);
897 return ret;
898 }
899
900 /*
901 * Stores an item in the cache (high level, obeys set/add/replace semantics)
902 */
903 enum store_item_type store_item(item *item, int comm, conn* c) {
904 enum store_item_type ret;
905 uint32_t hv;
906
907 hv = hash(ITEM_key(item), item->nkey);
908 item_lock(hv);
909 ret = do_store_item(item, comm, c, hv);
910 item_unlock(hv);
911 return ret;
912 }
913
914 /******************************* GLOBAL STATS ******************************/
915
916 void STATS_LOCK() {
917 pthread_mutex_lock(&stats_lock);
918 }
919
920 void STATS_UNLOCK() {
921 pthread_mutex_unlock(&stats_lock);
922 }
923
924 void threadlocal_stats_reset(void) {
925 int ii;
926 for (ii = 0; ii < settings.num_threads; ++ii) {
927 pthread_mutex_lock(&threads[ii].stats.mutex);
928 #define X(name) threads[ii].stats.name = 0;
929 THREAD_STATS_FIELDS
930 #ifdef EXTSTORE
931 EXTSTORE_THREAD_STATS_FIELDS
932 #endif
933 #ifdef PROXY
934 PROXY_THREAD_STATS_FIELDS
935 #endif
936 #undef X
937
938 memset(&threads[ii].stats.slab_stats, 0,
939 sizeof(threads[ii].stats.slab_stats));
940 memset(&threads[ii].stats.lru_hits, 0,
941 sizeof(uint64_t) * POWER_LARGEST);
942
943 pthread_mutex_unlock(&threads[ii].stats.mutex);
944 }
945 }
946
947 void threadlocal_stats_aggregate(struct thread_stats *stats) {
948 int ii, sid;
949
950 /* The struct has a mutex, but we can safely set the whole thing
951 * to zero since it is unused when aggregating. */
952 memset(stats, 0, sizeof(*stats));
953
954 for (ii = 0; ii < settings.num_threads; ++ii) {
955 pthread_mutex_lock(&threads[ii].stats.mutex);
956 #define X(name) stats->name += threads[ii].stats.name;
957 THREAD_STATS_FIELDS
958 #ifdef EXTSTORE
959 EXTSTORE_THREAD_STATS_FIELDS
960 #endif
961 #ifdef PROXY
962 PROXY_THREAD_STATS_FIELDS
963 #endif
964 #undef X
965
966 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
967 #define X(name) stats->slab_stats[sid].name += \
968 threads[ii].stats.slab_stats[sid].name;
969 SLAB_STATS_FIELDS
970 #undef X
971 }
972
973 for (sid = 0; sid < POWER_LARGEST; sid++) {
974 stats->lru_hits[sid] +=
975 threads[ii].stats.lru_hits[sid];
976 stats->slab_stats[CLEAR_LRU(sid)].get_hits +=
977 threads[ii].stats.lru_hits[sid];
978 }
979
980 stats->read_buf_count += threads[ii].rbuf_cache->total;
981 stats->read_buf_bytes += threads[ii].rbuf_cache->total * READ_BUFFER_SIZE;
982 stats->read_buf_bytes_free += threads[ii].rbuf_cache->freecurr * READ_BUFFER_SIZE;
983 pthread_mutex_unlock(&threads[ii].stats.mutex);
984 }
985 }
986
987 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
988 int sid;
989
990 memset(out, 0, sizeof(*out));
991
992 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
993 #define X(name) out->name += stats->slab_stats[sid].name;
994 SLAB_STATS_FIELDS
995 #undef X
996 }
997 }
998
999 /*
1000 * Initializes the thread subsystem, creating various worker threads.
1001 *
1002 * nthreads Number of worker event handler threads to spawn
1003 */
1004 void memcached_thread_init(int nthreads, void *arg) {
1005 int i;
1006 int power;
1007
1008 for (i = 0; i < POWER_LARGEST; i++) {
1009 pthread_mutex_init(&lru_locks[i], NULL);
1010 }
1011 pthread_mutex_init(&worker_hang_lock, NULL);
1012
1013 pthread_mutex_init(&init_lock, NULL);
1014 pthread_cond_init(&init_cond, NULL);
1015
1016 /* Want a wide lock table, but don't waste memory */
1017 if (nthreads < 3) {
1018 power = 10;
1019 } else if (nthreads < 4) {
1020 power = 11;
1021 } else if (nthreads < 5) {
1022 power = 12;
1023 } else if (nthreads <= 10) {
1024 power = 13;
1025 } else if (nthreads <= 20) {
1026 power = 14;
1027 } else {
1028 /* 32k buckets. just under the hashpower default. */
1029 power = 15;
1030 }
1031
1032 if (power >= hashpower) {
1033 fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
1034 fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
1035 fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
1036 exit(1);
1037 }
1038
1039 item_lock_count = hashsize(power);
1040 item_lock_hashpower = power;
1041
1042 item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
1043 if (! item_locks) {
1044 perror("Can't allocate item locks");
1045 exit(1);
1046 }
1047 for (i = 0; i < item_lock_count; i++) {
1048 pthread_mutex_init(&item_locks[i], NULL);
1049 }
1050
1051 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
1052 if (! threads) {
1053 perror("Can't allocate thread descriptors");
1054 exit(1);
1055 }
1056
1057 for (i = 0; i < nthreads; i++) {
1058 #ifdef HAVE_EVENTFD
1059 threads[i].notify_event_fd = eventfd(0, EFD_NONBLOCK);
1060 if (threads[i].notify_event_fd == -1) {
1061 perror("failed creating eventfd for worker thread");
1062 exit(1);
1063 }
1064 #else
1065 int fds[2];
1066 if (pipe(fds)) {
1067 perror("Can't create notify pipe");
1068 exit(1);
1069 }
1070
1071 threads[i].notify_receive_fd = fds[0];
1072 threads[i].notify_send_fd = fds[1];
1073 #endif
1074 #ifdef EXTSTORE
1075 threads[i].storage = arg;
1076 #endif
1077 setup_thread(&threads[i]);
1078 /* Reserve three fds for the libevent base, and two for the pipe */
1079 stats_state.reserved_fds += 5;
1080 }
1081
1082 /* Create threads after we've done all the libevent setup. */
1083 for (i = 0; i < nthreads; i++) {
1084 create_worker(worker_libevent, &threads[i]);
1085 }
1086
1087 /* Wait for all the threads to set themselves up before returning. */
1088 pthread_mutex_lock(&init_lock);
1089 wait_for_thread_registration(nthreads);
1090 pthread_mutex_unlock(&init_lock);
1091 }
1092