"Fossies" - the Fresh Open Source Software Archive 
Member "memcached-1.6.15/memcached.c" (30 Mar 2022, 224737 Bytes) of package /linux/www/memcached-1.6.15.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "memcached.c" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
1.6.14_vs_1.6.15.
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * memcached - memory caching daemon
4 *
5 * https://www.memcached.org/
6 *
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
8 *
9 * Use and distribution licensed under the BSD license. See
10 * the LICENSE file for full text.
11 *
12 * Authors:
13 * Anatoly Vorobey <mellon@pobox.com>
14 * Brad Fitzpatrick <brad@danga.com>
15 */
16 #include "memcached.h"
17 #include "storage.h"
18 #include "authfile.h"
19 #include "restart.h"
20 #include <sys/stat.h>
21 #include <sys/socket.h>
22 #include <sys/un.h>
23 #include <signal.h>
24 #include <sys/param.h>
25 #include <sys/resource.h>
26 #include <sys/uio.h>
27 #include <ctype.h>
28 #include <stdarg.h>
29
30 /* some POSIX systems need the following definition
31 * to get mlockall flags out of sys/mman.h. */
32 #ifndef _P1003_1B_VISIBLE
33 #define _P1003_1B_VISIBLE
34 #endif
35 #include <pwd.h>
36 #include <sys/mman.h>
37 #include <fcntl.h>
38 #include <netinet/tcp.h>
39 #include <arpa/inet.h>
40 #include <errno.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #include <time.h>
45 #include <assert.h>
46 #include <sysexits.h>
47 #include <stddef.h>
48
49 #ifdef HAVE_GETOPT_LONG
50 #include <getopt.h>
51 #endif
52
53 #ifdef TLS
54 #include "tls.h"
55 #endif
56
57 #include "proto_text.h"
58 #include "proto_bin.h"
59 #include "proto_proxy.h"
60
61 #if defined(__FreeBSD__)
62 #include <sys/sysctl.h>
63 #endif
64
65 /*
66 * forward declarations
67 */
68 static void drive_machine(conn *c);
69 static int new_socket(struct addrinfo *ai);
70 static ssize_t tcp_read(conn *arg, void *buf, size_t count);
71 static ssize_t tcp_sendmsg(conn *arg, struct msghdr *msg, int flags);
72 static ssize_t tcp_write(conn *arg, void *buf, size_t count);
73
74 enum try_read_result {
75 READ_DATA_RECEIVED,
76 READ_NO_DATA_RECEIVED,
77 READ_ERROR, /** an error occurred (on the socket) (or client closed connection) */
78 READ_MEMORY_ERROR /** failed to allocate more memory */
79 };
80
81 static int try_read_command_negotiate(conn *c);
82 static int try_read_command_udp(conn *c);
83
84 static enum try_read_result try_read_network(conn *c);
85 static enum try_read_result try_read_udp(conn *c);
86
87 static int start_conn_timeout_thread();
88
89 /* stats */
90 static void stats_init(void);
91 static void conn_to_str(const conn *c, char *addr, char *svr_addr);
92
93 /* defaults */
94 static void settings_init(void);
95
96 /* event handling, network IO */
97 static void event_handler(const evutil_socket_t fd, const short which, void *arg);
98 static void conn_close(conn *c);
99 static void conn_init(void);
100 static bool update_event(conn *c, const int new_flags);
101 static void complete_nread(conn *c);
102
103 static void conn_free(conn *c);
104
105 /** exported globals **/
106 struct stats stats;
107 struct stats_state stats_state;
108 struct settings settings;
109 time_t process_started; /* when the process was started */
110 conn **conns;
111
112 struct slab_rebalance slab_rebal;
113 volatile int slab_rebalance_signal;
114 #ifdef EXTSTORE
115 /* hoping this is temporary; I'd prefer to cut globals, but will complete this
116 * battle another day.
117 */
118 void *ext_storage = NULL;
119 #endif
120 /** file scope variables **/
121 static conn *listen_conn = NULL;
122 static int max_fds;
123 static struct event_base *main_base;
124
125 enum transmit_result {
126 TRANSMIT_COMPLETE, /** All done writing. */
127 TRANSMIT_INCOMPLETE, /** More data remaining to write. */
128 TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
129 TRANSMIT_HARD_ERROR /** Can't write (c->state is set to conn_closing) */
130 };
131
132 /* Default methods to read from/ write to a socket */
133 ssize_t tcp_read(conn *c, void *buf, size_t count) {
134 assert (c != NULL);
135 return read(c->sfd, buf, count);
136 }
137
138 ssize_t tcp_sendmsg(conn *c, struct msghdr *msg, int flags) {
139 assert (c != NULL);
140 return sendmsg(c->sfd, msg, flags);
141 }
142
143 ssize_t tcp_write(conn *c, void *buf, size_t count) {
144 assert (c != NULL);
145 return write(c->sfd, buf, count);
146 }
147
148 static enum transmit_result transmit(conn *c);
149
150 /* This reduces the latency without adding lots of extra wiring to be able to
151 * notify the listener thread of when to listen again.
152 * Also, the clock timer could be broken out into its own thread and we
153 * can block the listener via a condition.
154 */
155 static volatile bool allow_new_conns = true;
156 static int stop_main_loop = NOT_STOP;
157 static struct event maxconnsevent;
158 static void maxconns_handler(const evutil_socket_t fd, const short which, void *arg) {
159 struct timeval t = {.tv_sec = 0, .tv_usec = 10000};
160
161 if (fd == -42 || allow_new_conns == false) {
162 /* reschedule in 10ms if we need to keep polling */
163 evtimer_set(&maxconnsevent, maxconns_handler, 0);
164 event_base_set(main_base, &maxconnsevent);
165 evtimer_add(&maxconnsevent, &t);
166 } else {
167 evtimer_del(&maxconnsevent);
168 accept_new_conns(true);
169 }
170 }
171
172 /*
173 * given time value that's either unix time or delta from current unix time, return
174 * unix time. Use the fact that delta can't exceed one month (and real time value can't
175 * be that low).
176 */
177 rel_time_t realtime(const time_t exptime) {
178 /* no. of seconds in 30 days - largest possible delta exptime */
179
180 if (exptime == 0) return 0; /* 0 means never expire */
181
182 if (exptime > REALTIME_MAXDELTA) {
183 /* if item expiration is at/before the server started, give it an
184 expiration time of 1 second after the server started.
185 (because 0 means don't expire). without this, we'd
186 underflow and wrap around to some large value way in the
187 future, effectively making items expiring in the past
188 really expiring never */
189 if (exptime <= process_started)
190 return (rel_time_t)1;
191 return (rel_time_t)(exptime - process_started);
192 } else {
193 return (rel_time_t)(exptime + current_time);
194 }
195 }
196
197 static void stats_init(void) {
198 memset(&stats, 0, sizeof(struct stats));
199 memset(&stats_state, 0, sizeof(struct stats_state));
200 stats_state.accepting_conns = true; /* assuming we start in this state. */
201
202 /* make the time we started always be 2 seconds before we really
203 did, so time(0) - time.started is never zero. if so, things
204 like 'settings.oldest_live' which act as booleans as well as
205 values are now false in boolean context... */
206 process_started = time(0) - ITEM_UPDATE_INTERVAL - 2;
207 stats_prefix_init(settings.prefix_delimiter);
208 }
209
210 void stats_reset(void) {
211 STATS_LOCK();
212 memset(&stats, 0, sizeof(struct stats));
213 stats_prefix_clear();
214 STATS_UNLOCK();
215 threadlocal_stats_reset();
216 item_stats_reset();
217 }
218
219 static void settings_init(void) {
220 settings.use_cas = true;
221 settings.access = 0700;
222 settings.port = 11211;
223 settings.udpport = 0;
224 #ifdef TLS
225 settings.ssl_enabled = false;
226 settings.ssl_ctx = NULL;
227 settings.ssl_chain_cert = NULL;
228 settings.ssl_key = NULL;
229 settings.ssl_verify_mode = SSL_VERIFY_NONE;
230 settings.ssl_keyformat = SSL_FILETYPE_PEM;
231 settings.ssl_ciphers = NULL;
232 settings.ssl_ca_cert = NULL;
233 settings.ssl_last_cert_refresh_time = current_time;
234 settings.ssl_wbuf_size = 16 * 1024; // default is 16KB (SSL max frame size is 17KB)
235 settings.ssl_session_cache = false;
236 settings.ssl_min_version = TLS1_2_VERSION;
237 #endif
238 /* By default this string should be NULL for getaddrinfo() */
239 settings.inter = NULL;
240 settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
241 settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
242 settings.verbose = 0;
243 settings.oldest_live = 0;
244 settings.oldest_cas = 0; /* supplements accuracy of oldest_live */
245 settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
246 settings.socketpath = NULL; /* by default, not using a unix socket */
247 settings.auth_file = NULL; /* by default, not using ASCII authentication tokens */
248 settings.factor = 1.25;
249 settings.chunk_size = 48; /* space for a modest key and value */
250 settings.num_threads = 4; /* N workers */
251 settings.num_threads_per_udp = 0;
252 settings.prefix_delimiter = ':';
253 settings.detail_enabled = 0;
254 settings.reqs_per_event = 20;
255 settings.backlog = 1024;
256 settings.binding_protocol = negotiating_prot;
257 settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
258 settings.slab_page_size = 1024 * 1024; /* chunks are split from 1MB pages. */
259 settings.slab_chunk_size_max = settings.slab_page_size / 2;
260 settings.sasl = false;
261 settings.maxconns_fast = true;
262 settings.lru_crawler = false;
263 settings.lru_crawler_sleep = 100;
264 settings.lru_crawler_tocrawl = 0;
265 settings.lru_maintainer_thread = false;
266 settings.lru_segmented = true;
267 settings.hot_lru_pct = 20;
268 settings.warm_lru_pct = 40;
269 settings.hot_max_factor = 0.2;
270 settings.warm_max_factor = 2.0;
271 settings.temp_lru = false;
272 settings.temporary_ttl = 61;
273 settings.idle_timeout = 0; /* disabled */
274 settings.hashpower_init = 0;
275 settings.slab_reassign = true;
276 settings.slab_automove = 1;
277 settings.slab_automove_ratio = 0.8;
278 settings.slab_automove_window = 30;
279 settings.shutdown_command = false;
280 settings.tail_repair_time = TAIL_REPAIR_TIME_DEFAULT;
281 settings.flush_enabled = true;
282 settings.dump_enabled = true;
283 settings.crawls_persleep = 1000;
284 settings.logger_watcher_buf_size = LOGGER_WATCHER_BUF_SIZE;
285 settings.logger_buf_size = LOGGER_BUF_SIZE;
286 settings.drop_privileges = false;
287 settings.watch_enabled = true;
288 settings.read_buf_mem_limit = 0;
289 #ifdef MEMCACHED_DEBUG
290 settings.relaxed_privileges = false;
291 #endif
292 settings.num_napi_ids = 0;
293 settings.memory_file = NULL;
294 }
295
296 extern pthread_mutex_t conn_lock;
297
298 /* Connection timeout thread bits */
299 static pthread_t conn_timeout_tid;
300 static int do_run_conn_timeout_thread;
301 static pthread_cond_t conn_timeout_cond = PTHREAD_COND_INITIALIZER;
302 static pthread_mutex_t conn_timeout_lock = PTHREAD_MUTEX_INITIALIZER;
303
304 #define CONNS_PER_SLICE 100
305 static void *conn_timeout_thread(void *arg) {
306 int i;
307 conn *c;
308 rel_time_t oldest_last_cmd;
309 int sleep_time;
310 int sleep_slice = max_fds / CONNS_PER_SLICE;
311 if (sleep_slice == 0)
312 sleep_slice = CONNS_PER_SLICE;
313
314 useconds_t timeslice = 1000000 / sleep_slice;
315
316 mutex_lock(&conn_timeout_lock);
317 while(do_run_conn_timeout_thread) {
318 if (settings.verbose > 2)
319 fprintf(stderr, "idle timeout thread at top of connection list\n");
320
321 oldest_last_cmd = current_time;
322
323 for (i = 0; i < max_fds; i++) {
324 if ((i % CONNS_PER_SLICE) == 0) {
325 if (settings.verbose > 2)
326 fprintf(stderr, "idle timeout thread sleeping for %ulus\n",
327 (unsigned int)timeslice);
328 usleep(timeslice);
329 }
330
331 if (!conns[i])
332 continue;
333
334 c = conns[i];
335
336 if (!IS_TCP(c->transport))
337 continue;
338
339 if (c->state != conn_new_cmd && c->state != conn_read)
340 continue;
341
342 if ((current_time - c->last_cmd_time) > settings.idle_timeout) {
343 timeout_conn(c);
344 } else {
345 if (c->last_cmd_time < oldest_last_cmd)
346 oldest_last_cmd = c->last_cmd_time;
347 }
348 }
349
350 /* This is the soonest we could have another connection time out */
351 sleep_time = settings.idle_timeout - (current_time - oldest_last_cmd) + 1;
352 if (sleep_time <= 0)
353 sleep_time = 1;
354
355 if (settings.verbose > 2)
356 fprintf(stderr,
357 "idle timeout thread finished pass, sleeping for %ds\n",
358 sleep_time);
359
360 struct timeval now;
361 struct timespec to_sleep;
362 gettimeofday(&now, NULL);
363 to_sleep.tv_sec = now.tv_sec + sleep_time;
364 to_sleep.tv_nsec = 0;
365
366 pthread_cond_timedwait(&conn_timeout_cond, &conn_timeout_lock, &to_sleep);
367 }
368
369 mutex_unlock(&conn_timeout_lock);
370 return NULL;
371 }
372
373 static int start_conn_timeout_thread() {
374 int ret;
375
376 if (settings.idle_timeout == 0)
377 return -1;
378
379 do_run_conn_timeout_thread = 1;
380 if ((ret = pthread_create(&conn_timeout_tid, NULL,
381 conn_timeout_thread, NULL)) != 0) {
382 fprintf(stderr, "Can't create idle connection timeout thread: %s\n",
383 strerror(ret));
384 return -1;
385 }
386
387 return 0;
388 }
389
390 int stop_conn_timeout_thread(void) {
391 if (!do_run_conn_timeout_thread)
392 return -1;
393 mutex_lock(&conn_timeout_lock);
394 do_run_conn_timeout_thread = 0;
395 pthread_cond_signal(&conn_timeout_cond);
396 mutex_unlock(&conn_timeout_lock);
397 pthread_join(conn_timeout_tid, NULL);
398 return 0;
399 }
400
401 /*
402 * read buffer cache helper functions
403 */
404 static void rbuf_release(conn *c) {
405 if (c->rbuf != NULL && c->rbytes == 0 && !IS_UDP(c->transport)) {
406 if (c->rbuf_malloced) {
407 free(c->rbuf);
408 c->rbuf_malloced = false;
409 } else {
410 do_cache_free(c->thread->rbuf_cache, c->rbuf);
411 }
412 c->rsize = 0;
413 c->rbuf = NULL;
414 c->rcurr = NULL;
415 }
416 }
417
418 static bool rbuf_alloc(conn *c) {
419 if (c->rbuf == NULL) {
420 c->rbuf = do_cache_alloc(c->thread->rbuf_cache);
421 if (!c->rbuf) {
422 THR_STATS_LOCK(c);
423 c->thread->stats.read_buf_oom++;
424 THR_STATS_UNLOCK(c);
425 return false;
426 }
427 c->rsize = READ_BUFFER_SIZE;
428 c->rcurr = c->rbuf;
429 }
430 return true;
431 }
432
433 // Just for handling huge ASCII multigets.
434 // The previous system was essentially the same; realloc'ing until big enough,
435 // then realloc'ing back down after the request finished.
436 bool rbuf_switch_to_malloc(conn *c) {
437 // Might as well start with x2 and work from there.
438 size_t size = c->rsize * 2;
439 char *tmp = malloc(size);
440 if (!tmp)
441 return false;
442
443 memcpy(tmp, c->rcurr, c->rbytes);
444 do_cache_free(c->thread->rbuf_cache, c->rbuf);
445
446 c->rcurr = c->rbuf = tmp;
447 c->rsize = size;
448 c->rbuf_malloced = true;
449 return true;
450 }
451
452 /*
453 * Initializes the connections array. We don't actually allocate connection
454 * structures until they're needed, so as to avoid wasting memory when the
455 * maximum connection count is much higher than the actual number of
456 * connections.
457 *
458 * This does end up wasting a few pointers' worth of memory for FDs that are
459 * used for things other than connections, but that's worth it in exchange for
460 * being able to directly index the conns array by FD.
461 */
462 static void conn_init(void) {
463 /* We're unlikely to see an FD much higher than maxconns. */
464 int next_fd = dup(1);
465 if (next_fd < 0) {
466 perror("Failed to duplicate file descriptor\n");
467 exit(1);
468 }
469 int headroom = 10; /* account for extra unexpected open FDs */
470 struct rlimit rl;
471
472 max_fds = settings.maxconns + headroom + next_fd;
473
474 /* But if possible, get the actual highest FD we can possibly ever see. */
475 if (getrlimit(RLIMIT_NOFILE, &rl) == 0) {
476 max_fds = rl.rlim_max;
477 } else {
478 fprintf(stderr, "Failed to query maximum file descriptor; "
479 "falling back to maxconns\n");
480 }
481
482 close(next_fd);
483
484 if ((conns = calloc(max_fds, sizeof(conn *))) == NULL) {
485 fprintf(stderr, "Failed to allocate connection structures\n");
486 /* This is unrecoverable so bail out early. */
487 exit(1);
488 }
489 }
490
491 static const char *prot_text(enum protocol prot) {
492 char *rv = "unknown";
493 switch(prot) {
494 case ascii_prot:
495 rv = "ascii";
496 break;
497 case binary_prot:
498 rv = "binary";
499 break;
500 case negotiating_prot:
501 rv = "auto-negotiate";
502 break;
503 #ifdef PROXY
504 case proxy_prot:
505 rv = "proxy";
506 break;
507 #endif
508 }
509 return rv;
510 }
511
512 void conn_close_idle(conn *c) {
513 if (settings.idle_timeout > 0 &&
514 (current_time - c->last_cmd_time) > settings.idle_timeout) {
515 if (c->state != conn_new_cmd && c->state != conn_read) {
516 if (settings.verbose > 1)
517 fprintf(stderr,
518 "fd %d wants to timeout, but isn't in read state", c->sfd);
519 return;
520 }
521
522 if (settings.verbose > 1)
523 fprintf(stderr, "Closing idle fd %d\n", c->sfd);
524
525 pthread_mutex_lock(&c->thread->stats.mutex);
526 c->thread->stats.idle_kicks++;
527 pthread_mutex_unlock(&c->thread->stats.mutex);
528
529 c->close_reason = IDLE_TIMEOUT_CLOSE;
530
531 conn_set_state(c, conn_closing);
532 drive_machine(c);
533 }
534 }
535
536 /* bring conn back from a sidethread. could have had its event base moved. */
537 void conn_worker_readd(conn *c) {
538 if (c->state == conn_io_queue) {
539 c->io_queues_submitted--;
540 // If we're still waiting for other queues to return, don't re-add the
541 // connection yet.
542 if (c->io_queues_submitted != 0) {
543 return;
544 }
545 }
546 c->ev_flags = EV_READ | EV_PERSIST;
547 event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c);
548 event_base_set(c->thread->base, &c->event);
549
550 // TODO: call conn_cleanup/fail/etc
551 if (event_add(&c->event, 0) == -1) {
552 perror("event_add");
553 }
554
555 // side thread wanted us to close immediately.
556 if (c->state == conn_closing) {
557 drive_machine(c);
558 return;
559 } else if (c->state == conn_io_queue) {
560 // machine will know how to return based on secondary state.
561 drive_machine(c);
562 } else {
563 conn_set_state(c, conn_new_cmd);
564 }
565 }
566
567 void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb, io_queue_stack_cb com_cb, io_queue_cb ret_cb, io_queue_cb fin_cb) {
568 io_queue_cb_t *q = t->io_queues;
569 while (q->type != IO_QUEUE_NONE) {
570 q++;
571 }
572 q->type = type;
573 q->ctx = ctx;
574 q->submit_cb = cb;
575 q->complete_cb = com_cb;
576 q->finalize_cb = fin_cb;
577 q->return_cb = ret_cb;
578 return;
579 }
580
581 void conn_io_queue_setup(conn *c) {
582 io_queue_cb_t *qcb = c->thread->io_queues;
583 io_queue_t *q = c->io_queues;
584 while (qcb->type != IO_QUEUE_NONE) {
585 q->type = qcb->type;
586 q->ctx = qcb->ctx;
587 q->stack_ctx = NULL;
588 q->count = 0;
589 qcb++;
590 q++;
591 }
592 }
593
594 // To be called from conn_release_items to ensure the stack ptrs are reset.
595 static void conn_io_queue_reset(conn *c) {
596 for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
597 assert(q->count == 0);
598 q->stack_ctx = NULL;
599 }
600 }
601
602 io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type) {
603 io_queue_cb_t *q = t->io_queues;
604 while (q->type != IO_QUEUE_NONE) {
605 if (q->type == type) {
606 return q;
607 }
608 q++;
609 }
610 return NULL;
611 }
612
613 io_queue_t *conn_io_queue_get(conn *c, int type) {
614 io_queue_t *q = c->io_queues;
615 while (q->type != IO_QUEUE_NONE) {
616 if (q->type == type) {
617 return q;
618 }
619 q++;
620 }
621 return NULL;
622 }
623
624 // called after returning to the main worker thread.
625 // users of the queue need to distinguish if the IO was actually consumed or
626 // not and handle appropriately.
627 static void conn_io_queue_complete(conn *c) {
628 io_queue_t *q = c->io_queues;
629 io_queue_cb_t *qcb = c->thread->io_queues;
630 while (q->type != IO_QUEUE_NONE) {
631 if (q->stack_ctx) {
632 qcb->complete_cb(q);
633 }
634 qcb++;
635 q++;
636 }
637 }
638
639 // called to return a single IO object to the original worker thread.
640 void conn_io_queue_return(io_pending_t *io) {
641 io_queue_cb_t *q = thread_io_queue_get(io->thread, io->io_queue_type);
642 q->return_cb(io);
643 return;
644 }
645
646 conn *conn_new(const int sfd, enum conn_states init_state,
647 const int event_flags,
648 const int read_buffer_size, enum network_transport transport,
649 struct event_base *base, void *ssl) {
650 conn *c;
651
652 assert(sfd >= 0 && sfd < max_fds);
653 c = conns[sfd];
654
655 if (NULL == c) {
656 if (!(c = (conn *)calloc(1, sizeof(conn)))) {
657 STATS_LOCK();
658 stats.malloc_fails++;
659 STATS_UNLOCK();
660 fprintf(stderr, "Failed to allocate connection object\n");
661 return NULL;
662 }
663 MEMCACHED_CONN_CREATE(c);
664 c->read = NULL;
665 c->sendmsg = NULL;
666 c->write = NULL;
667 c->rbuf = NULL;
668
669 c->rsize = read_buffer_size;
670
671 // UDP connections use a persistent static buffer.
672 if (c->rsize) {
673 c->rbuf = (char *)malloc((size_t)c->rsize);
674 }
675
676 if (c->rsize && c->rbuf == NULL) {
677 conn_free(c);
678 STATS_LOCK();
679 stats.malloc_fails++;
680 STATS_UNLOCK();
681 fprintf(stderr, "Failed to allocate buffers for connection\n");
682 return NULL;
683 }
684
685
686 STATS_LOCK();
687 stats_state.conn_structs++;
688 STATS_UNLOCK();
689
690 c->sfd = sfd;
691 conns[sfd] = c;
692 }
693
694 c->transport = transport;
695 c->protocol = settings.binding_protocol;
696
697 /* unix socket mode doesn't need this, so zeroed out. but why
698 * is this done for every command? presumably for UDP
699 * mode. */
700 if (!settings.socketpath) {
701 c->request_addr_size = sizeof(c->request_addr);
702 } else {
703 c->request_addr_size = 0;
704 }
705
706 if (transport == tcp_transport && init_state == conn_new_cmd) {
707 if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
708 &c->request_addr_size)) {
709 perror("getpeername");
710 memset(&c->request_addr, 0, sizeof(c->request_addr));
711 }
712 }
713
714 if (init_state == conn_new_cmd) {
715 LOGGER_LOG(NULL, LOG_CONNEVENTS, LOGGER_CONNECTION_NEW, NULL,
716 &c->request_addr, c->request_addr_size, c->transport, 0, sfd);
717 }
718
719 if (settings.verbose > 1) {
720 if (init_state == conn_listening) {
721 fprintf(stderr, "<%d server listening (%s)\n", sfd,
722 prot_text(c->protocol));
723 } else if (IS_UDP(transport)) {
724 fprintf(stderr, "<%d server listening (udp)\n", sfd);
725 } else if (c->protocol == negotiating_prot) {
726 fprintf(stderr, "<%d new auto-negotiating client connection\n",
727 sfd);
728 } else if (c->protocol == ascii_prot) {
729 fprintf(stderr, "<%d new ascii client connection.\n", sfd);
730 } else if (c->protocol == binary_prot) {
731 fprintf(stderr, "<%d new binary client connection.\n", sfd);
732 } else {
733 fprintf(stderr, "<%d new unknown (%d) client connection\n",
734 sfd, c->protocol);
735 assert(false);
736 }
737 }
738
739 #ifdef TLS
740 c->ssl = NULL;
741 c->ssl_wbuf = NULL;
742 c->ssl_enabled = false;
743 #endif
744 c->state = init_state;
745 c->rlbytes = 0;
746 c->cmd = -1;
747 c->rbytes = 0;
748 c->rcurr = c->rbuf;
749 c->ritem = 0;
750 c->rbuf_malloced = false;
751 c->item_malloced = false;
752 c->sasl_started = false;
753 c->set_stale = false;
754 c->mset_res = false;
755 c->close_after_write = false;
756 c->last_cmd_time = current_time; /* initialize for idle kicker */
757 // wipe all queues.
758 memset(c->io_queues, 0, sizeof(c->io_queues));
759 c->io_queues_submitted = 0;
760
761 c->item = 0;
762
763 c->noreply = false;
764
765 #ifdef TLS
766 if (ssl) {
767 c->ssl = (SSL*)ssl;
768 c->read = ssl_read;
769 c->sendmsg = ssl_sendmsg;
770 c->write = ssl_write;
771 c->ssl_enabled = true;
772 SSL_set_info_callback(c->ssl, ssl_callback);
773 } else
774 #else
775 // This must be NULL if TLS is not enabled.
776 assert(ssl == NULL);
777 #endif
778 {
779 c->read = tcp_read;
780 c->sendmsg = tcp_sendmsg;
781 c->write = tcp_write;
782 }
783
784 if (IS_UDP(transport)) {
785 c->try_read_command = try_read_command_udp;
786 } else {
787 switch (c->protocol) {
788 case ascii_prot:
789 if (settings.auth_file == NULL) {
790 c->authenticated = true;
791 c->try_read_command = try_read_command_ascii;
792 } else {
793 c->authenticated = false;
794 c->try_read_command = try_read_command_asciiauth;
795 }
796 break;
797 case binary_prot:
798 // binprot handles its own authentication via SASL parsing.
799 c->authenticated = false;
800 c->try_read_command = try_read_command_binary;
801 break;
802 case negotiating_prot:
803 c->try_read_command = try_read_command_negotiate;
804 break;
805 #ifdef PROXY
806 case proxy_prot:
807 c->try_read_command = try_read_command_proxy;
808 break;
809 #endif
810 }
811 }
812
813 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
814 event_base_set(base, &c->event);
815 c->ev_flags = event_flags;
816
817 if (event_add(&c->event, 0) == -1) {
818 perror("event_add");
819 return NULL;
820 }
821
822 STATS_LOCK();
823 stats_state.curr_conns++;
824 stats.total_conns++;
825 STATS_UNLOCK();
826
827 MEMCACHED_CONN_ALLOCATE(c->sfd);
828
829 return c;
830 }
831
832 void conn_release_items(conn *c) {
833 assert(c != NULL);
834
835 if (c->item) {
836 if (c->item_malloced) {
837 free(c->item);
838 c->item_malloced = false;
839 } else {
840 item_remove(c->item);
841 }
842 c->item = 0;
843 }
844
845 // Cull any unsent responses.
846 if (c->resp_head) {
847 mc_resp *resp = c->resp_head;
848 // r_f() handles the chain maintenance.
849 while (resp) {
850 // temporary by default. hide behind a debug flag in the future:
851 // double free detection. Transmit loops can drop out early, but
852 // here we could infinite loop.
853 if (resp->free) {
854 fprintf(stderr, "ERROR: double free detected during conn_release_items(): [%d] [%s]\n",
855 c->sfd, c->protocol == binary_prot ? "binary" : "ascii");
856 // Since this is a critical failure, just leak the memory.
857 // If these errors are seen, an abort() can be used instead.
858 c->resp_head = NULL;
859 c->resp = NULL;
860 break;
861 }
862 resp = resp_finish(c, resp);
863 }
864 conn_io_queue_reset(c);
865 }
866 }
867
868 static void conn_cleanup(conn *c) {
869 assert(c != NULL);
870
871 conn_release_items(c);
872 #ifdef PROXY
873 if (c->proxy_coro_ref) {
874 proxy_cleanup_conn(c);
875 }
876 #endif
877 if (c->sasl_conn) {
878 assert(settings.sasl);
879 sasl_dispose(&c->sasl_conn);
880 c->sasl_conn = NULL;
881 }
882
883 if (IS_UDP(c->transport)) {
884 conn_set_state(c, conn_read);
885 }
886 }
887
888 /*
889 * Frees a connection.
890 */
891 void conn_free(conn *c) {
892 if (c) {
893 assert(c != NULL);
894 assert(c->sfd >= 0 && c->sfd < max_fds);
895
896 MEMCACHED_CONN_DESTROY(c);
897 conns[c->sfd] = NULL;
898 if (c->rbuf)
899 free(c->rbuf);
900 #ifdef TLS
901 if (c->ssl_wbuf)
902 c->ssl_wbuf = NULL;
903 #endif
904
905 free(c);
906 }
907 }
908
909 static void conn_close(conn *c) {
910 assert(c != NULL);
911
912 if (c->thread) {
913 LOGGER_LOG(c->thread->l, LOG_CONNEVENTS, LOGGER_CONNECTION_CLOSE, NULL,
914 &c->request_addr, c->request_addr_size, c->transport,
915 c->close_reason, c->sfd);
916 }
917
918 /* delete the event, the socket and the conn */
919 event_del(&c->event);
920
921 if (settings.verbose > 1)
922 fprintf(stderr, "<%d connection closed.\n", c->sfd);
923
924 conn_cleanup(c);
925
926 // force release of read buffer.
927 if (c->thread) {
928 c->rbytes = 0;
929 rbuf_release(c);
930 }
931
932 MEMCACHED_CONN_RELEASE(c->sfd);
933 conn_set_state(c, conn_closed);
934 #ifdef TLS
935 if (c->ssl) {
936 SSL_shutdown(c->ssl);
937 SSL_free(c->ssl);
938 }
939 #endif
940 close(c->sfd);
941 c->close_reason = 0;
942 pthread_mutex_lock(&conn_lock);
943 allow_new_conns = true;
944 pthread_mutex_unlock(&conn_lock);
945
946 STATS_LOCK();
947 stats_state.curr_conns--;
948 STATS_UNLOCK();
949
950 return;
951 }
952
953 // Since some connections might be off on side threads and some are managed as
954 // listeners we need to walk through them all from a central point.
955 // Must be called with all worker threads hung or in the process of closing.
956 void conn_close_all(void) {
957 int i;
958 for (i = 0; i < max_fds; i++) {
959 if (conns[i] && conns[i]->state != conn_closed) {
960 conn_close(conns[i]);
961 }
962 }
963 }
964
965 /**
966 * Convert a state name to a human readable form.
967 */
968 static const char *state_text(enum conn_states state) {
969 const char* const statenames[] = { "conn_listening",
970 "conn_new_cmd",
971 "conn_waiting",
972 "conn_read",
973 "conn_parse_cmd",
974 "conn_write",
975 "conn_nread",
976 "conn_swallow",
977 "conn_closing",
978 "conn_mwrite",
979 "conn_closed",
980 "conn_watch",
981 "conn_io_queue" };
982 return statenames[state];
983 }
984
985 /*
986 * Sets a connection's current state in the state machine. Any special
987 * processing that needs to happen on certain state transitions can
988 * happen here.
989 */
990 void conn_set_state(conn *c, enum conn_states state) {
991 assert(c != NULL);
992 assert(state >= conn_listening && state < conn_max_state);
993
994 if (state != c->state) {
995 if (settings.verbose > 2) {
996 fprintf(stderr, "%d: going from %s to %s\n",
997 c->sfd, state_text(c->state),
998 state_text(state));
999 }
1000
1001 if (state == conn_write || state == conn_mwrite) {
1002 MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->resp->wbuf, c->resp->wbytes);
1003 }
1004 c->state = state;
1005 }
1006 }
1007
1008 /*
1009 * response object helper functions
1010 */
1011 void resp_reset(mc_resp *resp) {
1012 if (resp->item) {
1013 item_remove(resp->item);
1014 resp->item = NULL;
1015 }
1016 if (resp->write_and_free) {
1017 free(resp->write_and_free);
1018 resp->write_and_free = NULL;
1019 }
1020 resp->wbytes = 0;
1021 resp->tosend = 0;
1022 resp->iovcnt = 0;
1023 resp->chunked_data_iov = 0;
1024 resp->chunked_total = 0;
1025 resp->skip = false;
1026 }
1027
1028 void resp_add_iov(mc_resp *resp, const void *buf, int len) {
1029 assert(resp->iovcnt < MC_RESP_IOVCOUNT);
1030 int x = resp->iovcnt;
1031 resp->iov[x].iov_base = (void *)buf;
1032 resp->iov[x].iov_len = len;
1033 resp->iovcnt++;
1034 resp->tosend += len;
1035 }
1036
1037 // Notes that an IOV should be handled as a chunked item header.
1038 // TODO: I'm hoping this isn't a permanent abstraction while I learn what the
1039 // API should be.
1040 void resp_add_chunked_iov(mc_resp *resp, const void *buf, int len) {
1041 resp->chunked_data_iov = resp->iovcnt;
1042 resp->chunked_total = len;
1043 resp_add_iov(resp, buf, len);
1044 }
1045
1046 // resp_allocate and resp_free are a wrapper around read buffers which makes
1047 // read buffers the only network memory to track.
1048 // Normally this would be too excessive. In this case it allows end users to
1049 // track a single memory limit for ephemeral connection buffers.
1050 // Fancy bit twiddling tricks are avoided to help keep this straightforward.
1051 static mc_resp* resp_allocate(conn *c) {
1052 LIBEVENT_THREAD *th = c->thread;
1053 mc_resp *resp = NULL;
1054 mc_resp_bundle *b = th->open_bundle;
1055
1056 if (b != NULL) {
1057 for (int i = 0; i < MAX_RESP_PER_BUNDLE; i++) {
1058 // loop around starting from the most likely to be free
1059 int x = (i + b->next_check) % MAX_RESP_PER_BUNDLE;
1060 if (b->r[x].free) {
1061 resp = &b->r[x];
1062 b->next_check = x+1;
1063 break;
1064 }
1065 }
1066
1067 if (resp != NULL) {
1068 b->refcount++;
1069 resp->free = false;
1070 if (b->refcount == MAX_RESP_PER_BUNDLE) {
1071 assert(b->prev == NULL);
1072 // We only allocate off the head. Assign new head.
1073 th->open_bundle = b->next;
1074 // Remove ourselves from the list.
1075 if (b->next) {
1076 b->next->prev = 0;
1077 b->next = 0;
1078 }
1079 }
1080 }
1081 }
1082
1083 if (resp == NULL) {
1084 assert(th->open_bundle == NULL);
1085 b = do_cache_alloc(th->rbuf_cache);
1086 if (b) {
1087 THR_STATS_LOCK(c);
1088 c->thread->stats.response_obj_bytes += READ_BUFFER_SIZE;
1089 THR_STATS_UNLOCK(c);
1090 b->next_check = 1;
1091 b->refcount = 1;
1092 for (int i = 0; i < MAX_RESP_PER_BUNDLE; i++) {
1093 b->r[i].bundle = b;
1094 b->r[i].free = true;
1095 }
1096 b->next = 0;
1097 b->prev = 0;
1098 th->open_bundle = b;
1099 resp = &b->r[0];
1100 resp->free = false;
1101 } else {
1102 return NULL;
1103 }
1104 }
1105
1106 return resp;
1107 }
1108
1109 static void resp_free(conn *c, mc_resp *resp) {
1110 LIBEVENT_THREAD *th = c->thread;
1111 mc_resp_bundle *b = resp->bundle;
1112
1113 resp->free = true;
1114 b->refcount--;
1115 if (b->refcount == 0) {
1116 if (b == th->open_bundle && b->next == 0) {
1117 // This is the final bundle. Just hold and reuse to skip init loop
1118 assert(b->prev == 0);
1119 b->next_check = 0;
1120 } else {
1121 // Assert that we're either in the list or at the head.
1122 assert((b->next || b->prev) || b == th->open_bundle);
1123
1124 // unlink from list.
1125 mc_resp_bundle **head = &th->open_bundle;
1126 if (*head == b) *head = b->next;
1127 // Not tracking the tail.
1128 assert(b->next != b && b->prev != b);
1129
1130 if (b->next) b->next->prev = b->prev;
1131 if (b->prev) b->prev->next = b->next;
1132
1133 // Now completely done with this buffer.
1134 do_cache_free(th->rbuf_cache, b);
1135 THR_STATS_LOCK(c);
1136 c->thread->stats.response_obj_bytes -= READ_BUFFER_SIZE;
1137 THR_STATS_UNLOCK(c);
1138 }
1139 } else {
1140 mc_resp_bundle **head = &th->open_bundle;
1141 // NOTE: since we're not tracking tail, latest free ends up in head.
1142 if (b == th->open_bundle || (b->prev || b->next)) {
1143 // If we're already linked, leave it in place to save CPU.
1144 } else {
1145 // Non-zero refcount, need to link into the freelist.
1146 b->prev = 0;
1147 b->next = *head;
1148 if (b->next) b->next->prev = b;
1149 *head = b;
1150 }
1151
1152 }
1153 }
1154
1155 bool resp_start(conn *c) {
1156 mc_resp *resp = resp_allocate(c);
1157 if (!resp) {
1158 THR_STATS_LOCK(c);
1159 c->thread->stats.response_obj_oom++;
1160 THR_STATS_UNLOCK(c);
1161 return false;
1162 }
1163 // handling the stats counters here to simplify testing
1164 THR_STATS_LOCK(c);
1165 c->thread->stats.response_obj_count++;
1166 THR_STATS_UNLOCK(c);
1167 // Skip zeroing the bundle pointer at the start.
1168 // TODO: this line is here temporarily to make the code easy to disable.
1169 // when it's more mature, move the memset into resp_allocate() and have it
1170 // set the bundle pointer on allocate so this line isn't as complex.
1171 memset((char *)resp + sizeof(mc_resp_bundle*), 0, sizeof(*resp) - sizeof(mc_resp_bundle*));
1172 // TODO: this next line works. memset _does_ show up significantly under
1173 // perf reports due to zeroing out the entire resp->wbuf. before swapping
1174 // the lines more validation work should be done to ensure wbuf's aren't
1175 // accidentally reused without being written to.
1176 //memset((char *)resp + sizeof(mc_resp_bundle*), 0, offsetof(mc_resp, wbuf));
1177 if (!c->resp_head) {
1178 c->resp_head = resp;
1179 }
1180 if (!c->resp) {
1181 c->resp = resp;
1182 } else {
1183 c->resp->next = resp;
1184 c->resp = resp;
1185 }
1186 if (IS_UDP(c->transport)) {
1187 // need to hold on to some data for async responses.
1188 c->resp->request_id = c->request_id;
1189 c->resp->request_addr = c->request_addr;
1190 c->resp->request_addr_size = c->request_addr_size;
1191 }
1192 return true;
1193 }
1194
1195 // returns next response in chain.
1196 mc_resp* resp_finish(conn *c, mc_resp *resp) {
1197 mc_resp *next = resp->next;
1198 if (resp->item) {
1199 // TODO: cache hash value in resp obj?
1200 item_remove(resp->item);
1201 resp->item = NULL;
1202 }
1203 if (resp->write_and_free) {
1204 free(resp->write_and_free);
1205 }
1206 if (resp->io_pending) {
1207 // If we had a pending IO, tell it to internally clean up then return
1208 // the main object back to our thread cache.
1209 io_queue_cb_t *qcb = thread_io_queue_get(c->thread, resp->io_pending->io_queue_type);
1210 qcb->finalize_cb(resp->io_pending);
1211 do_cache_free(c->thread->io_cache, resp->io_pending);
1212 resp->io_pending = NULL;
1213 }
1214 if (c->resp_head == resp) {
1215 c->resp_head = next;
1216 }
1217 if (c->resp == resp) {
1218 c->resp = NULL;
1219 }
1220 resp_free(c, resp);
1221 THR_STATS_LOCK(c);
1222 c->thread->stats.response_obj_count--;
1223 THR_STATS_UNLOCK(c);
1224 return next;
1225 }
1226
1227 // tells if connection has a depth of response objects to process.
1228 bool resp_has_stack(conn *c) {
1229 return c->resp_head->next != NULL ? true : false;
1230 }
1231
1232 void out_string(conn *c, const char *str) {
1233 size_t len;
1234 assert(c != NULL);
1235 mc_resp *resp = c->resp;
1236
1237 // if response was original filled with something, but we're now writing
1238 // out an error or similar, have to reset the object first.
1239 // TODO: since this is often redundant with allocation, how many callers
1240 // are actually requiring it be reset? Can we fast test by just looking at
1241 // tosend and reset if nonzero?
1242 resp_reset(resp);
1243
1244 if (c->noreply) {
1245 // TODO: just invalidate the response since nothing's been attempted
1246 // to send yet?
1247 resp->skip = true;
1248 if (settings.verbose > 1)
1249 fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
1250 conn_set_state(c, conn_new_cmd);
1251 return;
1252 }
1253
1254 if (settings.verbose > 1)
1255 fprintf(stderr, ">%d %s\n", c->sfd, str);
1256
1257 // Fill response object with static string.
1258
1259 len = strlen(str);
1260 if ((len + 2) > WRITE_BUFFER_SIZE) {
1261 /* ought to be always enough. just fail for simplicity */
1262 str = "SERVER_ERROR output line too long";
1263 len = strlen(str);
1264 }
1265
1266 memcpy(resp->wbuf, str, len);
1267 memcpy(resp->wbuf + len, "\r\n", 2);
1268 resp_add_iov(resp, resp->wbuf, len + 2);
1269
1270 conn_set_state(c, conn_new_cmd);
1271 return;
1272 }
1273
1274 // For metaget-style ASCII commands. Ignores noreply, ensuring clients see
1275 // protocol level errors.
1276 void out_errstring(conn *c, const char *str) {
1277 c->noreply = false;
1278 out_string(c, str);
1279 }
1280
1281 /*
1282 * Outputs a protocol-specific "out of memory" error. For ASCII clients,
1283 * this is equivalent to out_string().
1284 */
1285 void out_of_memory(conn *c, char *ascii_error) {
1286 const static char error_prefix[] = "SERVER_ERROR ";
1287 const static int error_prefix_len = sizeof(error_prefix) - 1;
1288
1289 if (c->protocol == binary_prot) {
1290 /* Strip off the generic error prefix; it's irrelevant in binary */
1291 if (!strncmp(ascii_error, error_prefix, error_prefix_len)) {
1292 ascii_error += error_prefix_len;
1293 }
1294 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, ascii_error, 0);
1295 } else {
1296 out_string(c, ascii_error);
1297 }
1298 }
1299
1300 static void append_bin_stats(const char *key, const uint16_t klen,
1301 const char *val, const uint32_t vlen,
1302 conn *c) {
1303 char *buf = c->stats.buffer + c->stats.offset;
1304 uint32_t bodylen = klen + vlen;
1305 protocol_binary_response_header header = {
1306 .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
1307 .response.opcode = PROTOCOL_BINARY_CMD_STAT,
1308 .response.keylen = (uint16_t)htons(klen),
1309 .response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1310 .response.bodylen = htonl(bodylen),
1311 .response.opaque = c->opaque
1312 };
1313
1314 memcpy(buf, header.bytes, sizeof(header.response));
1315 buf += sizeof(header.response);
1316
1317 if (klen > 0) {
1318 memcpy(buf, key, klen);
1319 buf += klen;
1320
1321 if (vlen > 0) {
1322 memcpy(buf, val, vlen);
1323 }
1324 }
1325
1326 c->stats.offset += sizeof(header.response) + bodylen;
1327 }
1328
1329 static void append_ascii_stats(const char *key, const uint16_t klen,
1330 const char *val, const uint32_t vlen,
1331 conn *c) {
1332 char *pos = c->stats.buffer + c->stats.offset;
1333 uint32_t nbytes = 0;
1334 int remaining = c->stats.size - c->stats.offset;
1335 int room = remaining - 1;
1336
1337 if (klen == 0 && vlen == 0) {
1338 nbytes = snprintf(pos, room, "END\r\n");
1339 } else if (vlen == 0) {
1340 nbytes = snprintf(pos, room, "STAT %s\r\n", key);
1341 } else {
1342 nbytes = snprintf(pos, room, "STAT %s %s\r\n", key, val);
1343 }
1344
1345 c->stats.offset += nbytes;
1346 }
1347
1348 static bool grow_stats_buf(conn *c, size_t needed) {
1349 size_t nsize = c->stats.size;
1350 size_t available = nsize - c->stats.offset;
1351 bool rv = true;
1352
1353 /* Special case: No buffer -- need to allocate fresh */
1354 if (c->stats.buffer == NULL) {
1355 nsize = 1024;
1356 available = c->stats.size = c->stats.offset = 0;
1357 }
1358
1359 while (needed > available) {
1360 assert(nsize > 0);
1361 nsize = nsize << 1;
1362 available = nsize - c->stats.offset;
1363 }
1364
1365 if (nsize != c->stats.size) {
1366 char *ptr = realloc(c->stats.buffer, nsize);
1367 if (ptr) {
1368 c->stats.buffer = ptr;
1369 c->stats.size = nsize;
1370 } else {
1371 STATS_LOCK();
1372 stats.malloc_fails++;
1373 STATS_UNLOCK();
1374 rv = false;
1375 }
1376 }
1377
1378 return rv;
1379 }
1380
1381 void append_stats(const char *key, const uint16_t klen,
1382 const char *val, const uint32_t vlen,
1383 const void *cookie)
1384 {
1385 /* value without a key is invalid */
1386 if (klen == 0 && vlen > 0) {
1387 return;
1388 }
1389
1390 conn *c = (conn*)cookie;
1391
1392 if (c->protocol == binary_prot) {
1393 size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
1394 if (!grow_stats_buf(c, needed)) {
1395 return;
1396 }
1397 append_bin_stats(key, klen, val, vlen, c);
1398 } else {
1399 size_t needed = vlen + klen + 10; // 10 == "STAT = \r\n"
1400 if (!grow_stats_buf(c, needed)) {
1401 return;
1402 }
1403 append_ascii_stats(key, klen, val, vlen, c);
1404 }
1405
1406 assert(c->stats.offset <= c->stats.size);
1407 }
1408
1409 static void reset_cmd_handler(conn *c) {
1410 c->cmd = -1;
1411 c->substate = bin_no_state;
1412 if (c->item != NULL) {
1413 // TODO: Any other way to get here?
1414 // SASL auth was mistakenly using it. Nothing else should?
1415 if (c->item_malloced) {
1416 free(c->item);
1417 c->item_malloced = false;
1418 } else {
1419 item_remove(c->item);
1420 }
1421 c->item = NULL;
1422 }
1423 if (c->rbytes > 0) {
1424 conn_set_state(c, conn_parse_cmd);
1425 } else if (c->resp_head) {
1426 conn_set_state(c, conn_mwrite);
1427 } else {
1428 conn_set_state(c, conn_waiting);
1429 }
1430 }
1431
1432 static void complete_nread(conn *c) {
1433 assert(c != NULL);
1434 #ifdef PROXY
1435 assert(c->protocol == ascii_prot
1436 || c->protocol == binary_prot
1437 || c->protocol == proxy_prot);
1438 #else
1439 assert(c->protocol == ascii_prot
1440 || c->protocol == binary_prot);
1441 #endif
1442 if (c->protocol == ascii_prot) {
1443 complete_nread_ascii(c);
1444 } else if (c->protocol == binary_prot) {
1445 complete_nread_binary(c);
1446 #ifdef PROXY
1447 } else if (c->protocol == proxy_prot) {
1448 complete_nread_proxy(c);
1449 #endif
1450 }
1451 }
1452
1453 /* Destination must always be chunked */
1454 /* This should be part of item.c */
1455 static int _store_item_copy_chunks(item *d_it, item *s_it, const int len) {
1456 item_chunk *dch = (item_chunk *) ITEM_schunk(d_it);
1457 /* Advance dch until we find free space */
1458 while (dch->size == dch->used) {
1459 if (dch->next) {
1460 dch = dch->next;
1461 } else {
1462 break;
1463 }
1464 }
1465
1466 if (s_it->it_flags & ITEM_CHUNKED) {
1467 int remain = len;
1468 item_chunk *sch = (item_chunk *) ITEM_schunk(s_it);
1469 int copied = 0;
1470 /* Fills dch's to capacity, not straight copy sch in case data is
1471 * being added or removed (ie append/prepend)
1472 */
1473 while (sch && dch && remain) {
1474 assert(dch->used <= dch->size);
1475 int todo = (dch->size - dch->used < sch->used - copied)
1476 ? dch->size - dch->used : sch->used - copied;
1477 if (remain < todo)
1478 todo = remain;
1479 memcpy(dch->data + dch->used, sch->data + copied, todo);
1480 dch->used += todo;
1481 copied += todo;
1482 remain -= todo;
1483 assert(dch->used <= dch->size);
1484 if (dch->size == dch->used) {
1485 item_chunk *tch = do_item_alloc_chunk(dch, remain);
1486 if (tch) {
1487 dch = tch;
1488 } else {
1489 return -1;
1490 }
1491 }
1492 assert(copied <= sch->used);
1493 if (copied == sch->used) {
1494 copied = 0;
1495 sch = sch->next;
1496 }
1497 }
1498 /* assert that the destination had enough space for the source */
1499 assert(remain == 0);
1500 } else {
1501 int done = 0;
1502 /* Fill dch's via a non-chunked item. */
1503 while (len > done && dch) {
1504 int todo = (dch->size - dch->used < len - done)
1505 ? dch->size - dch->used : len - done;
1506 //assert(dch->size - dch->used != 0);
1507 memcpy(dch->data + dch->used, ITEM_data(s_it) + done, todo);
1508 done += todo;
1509 dch->used += todo;
1510 assert(dch->used <= dch->size);
1511 if (dch->size == dch->used) {
1512 item_chunk *tch = do_item_alloc_chunk(dch, len - done);
1513 if (tch) {
1514 dch = tch;
1515 } else {
1516 return -1;
1517 }
1518 }
1519 }
1520 assert(len == done);
1521 }
1522 return 0;
1523 }
1524
1525 static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add_it) {
1526 if (comm == NREAD_APPEND) {
1527 if (new_it->it_flags & ITEM_CHUNKED) {
1528 if (_store_item_copy_chunks(new_it, old_it, old_it->nbytes - 2) == -1 ||
1529 _store_item_copy_chunks(new_it, add_it, add_it->nbytes) == -1) {
1530 return -1;
1531 }
1532 } else {
1533 memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
1534 memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(add_it), add_it->nbytes);
1535 }
1536 } else {
1537 /* NREAD_PREPEND */
1538 if (new_it->it_flags & ITEM_CHUNKED) {
1539 if (_store_item_copy_chunks(new_it, add_it, add_it->nbytes - 2) == -1 ||
1540 _store_item_copy_chunks(new_it, old_it, old_it->nbytes) == -1) {
1541 return -1;
1542 }
1543 } else {
1544 memcpy(ITEM_data(new_it), ITEM_data(add_it), add_it->nbytes);
1545 memcpy(ITEM_data(new_it) + add_it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
1546 }
1547 }
1548 return 0;
1549 }
1550
1551 /*
1552 * Stores an item in the cache according to the semantics of one of the set
1553 * commands. Protected by the item lock.
1554 *
1555 * Returns the state of storage.
1556 */
1557 enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
1558 char *key = ITEM_key(it);
1559 item *old_it = do_item_get(key, it->nkey, hv, c, DONT_UPDATE);
1560 enum store_item_type stored = NOT_STORED;
1561
1562 enum cas_result { CAS_NONE, CAS_MATCH, CAS_BADVAL, CAS_STALE, CAS_MISS };
1563
1564 item *new_it = NULL;
1565 uint32_t flags;
1566
1567 /* Do the CAS test up front so we can apply to all store modes */
1568 enum cas_result cas_res = CAS_NONE;
1569
1570 bool do_store = false;
1571 if (old_it != NULL) {
1572 // Most of the CAS work requires something to compare to.
1573 uint64_t it_cas = ITEM_get_cas(it);
1574 uint64_t old_cas = ITEM_get_cas(old_it);
1575 if (it_cas == 0) {
1576 cas_res = CAS_NONE;
1577 } else if (it_cas == old_cas) {
1578 cas_res = CAS_MATCH;
1579 } else if (c->set_stale && it_cas < old_cas) {
1580 cas_res = CAS_STALE;
1581 } else {
1582 cas_res = CAS_BADVAL;
1583 }
1584
1585 switch (comm) {
1586 case NREAD_ADD:
1587 /* add only adds a nonexistent item, but promote to head of LRU */
1588 do_item_update(old_it);
1589 break;
1590 case NREAD_CAS:
1591 if (cas_res == CAS_MATCH) {
1592 // cas validates
1593 // it and old_it may belong to different classes.
1594 // I'm updating the stats for the one that's getting pushed out
1595 pthread_mutex_lock(&c->thread->stats.mutex);
1596 c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
1597 pthread_mutex_unlock(&c->thread->stats.mutex);
1598 do_store = true;
1599 } else if (cas_res == CAS_STALE) {
1600 // if we're allowed to set a stale value, CAS must be lower than
1601 // the current item's CAS.
1602 // This replaces the value, but should preserve TTL, and stale
1603 // item marker bit + token sent if exists.
1604 it->exptime = old_it->exptime;
1605 it->it_flags |= ITEM_STALE;
1606 if (old_it->it_flags & ITEM_TOKEN_SENT) {
1607 it->it_flags |= ITEM_TOKEN_SENT;
1608 }
1609
1610 pthread_mutex_lock(&c->thread->stats.mutex);
1611 c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_hits++;
1612 pthread_mutex_unlock(&c->thread->stats.mutex);
1613 do_store = true;
1614 } else {
1615 // NONE or BADVAL are the same for CAS cmd
1616 pthread_mutex_lock(&c->thread->stats.mutex);
1617 c->thread->stats.slab_stats[ITEM_clsid(old_it)].cas_badval++;
1618 pthread_mutex_unlock(&c->thread->stats.mutex);
1619
1620 if (settings.verbose > 1) {
1621 fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
1622 (unsigned long long)ITEM_get_cas(old_it),
1623 (unsigned long long)ITEM_get_cas(it));
1624 }
1625 stored = EXISTS;
1626 }
1627 break;
1628 case NREAD_APPEND:
1629 case NREAD_PREPEND:
1630 if (cas_res != CAS_NONE && cas_res != CAS_MATCH) {
1631 stored = EXISTS;
1632 break;
1633 }
1634 #ifdef EXTSTORE
1635 if ((old_it->it_flags & ITEM_HDR) != 0) {
1636 /* block append/prepend from working with extstore-d items.
1637 * leave response code to NOT_STORED default */
1638 break;
1639 }
1640 #endif
1641 /* we have it and old_it here - alloc memory to hold both */
1642 FLAGS_CONV(old_it, flags);
1643 new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
1644
1645 // OOM trying to copy.
1646 if (new_it == NULL)
1647 break;
1648 /* copy data from it and old_it to new_it */
1649 if (_store_item_copy_data(comm, old_it, new_it, it) == -1) {
1650 // failed data copy
1651 break;
1652 } else {
1653 // refcount of new_it is 1 here. will end up 2 after link.
1654 // it's original ref is managed outside of this function
1655 it = new_it;
1656 do_store = true;
1657 }
1658 break;
1659 case NREAD_REPLACE:
1660 case NREAD_SET:
1661 do_store = true;
1662 break;
1663 }
1664
1665 if (do_store) {
1666 STORAGE_delete(c->thread->storage, old_it);
1667 item_replace(old_it, it, hv);
1668 stored = STORED;
1669 }
1670
1671 do_item_remove(old_it); /* release our reference */
1672 if (new_it != NULL) {
1673 // append/prepend end up with an extra reference for new_it.
1674 do_item_remove(new_it);
1675 }
1676 } else {
1677 /* No pre-existing item to replace or compare to. */
1678 if (ITEM_get_cas(it) != 0) {
1679 /* Asked for a CAS match but nothing to compare it to. */
1680 cas_res = CAS_MISS;
1681 }
1682
1683 switch (comm) {
1684 case NREAD_ADD:
1685 case NREAD_SET:
1686 do_store = true;
1687 break;
1688 case NREAD_CAS:
1689 // LRU expired
1690 stored = NOT_FOUND;
1691 pthread_mutex_lock(&c->thread->stats.mutex);
1692 c->thread->stats.cas_misses++;
1693 pthread_mutex_unlock(&c->thread->stats.mutex);
1694 break;
1695 case NREAD_REPLACE:
1696 case NREAD_APPEND:
1697 case NREAD_PREPEND:
1698 /* Requires an existing item. */
1699 break;
1700 }
1701
1702 if (do_store) {
1703 do_item_link(it, hv);
1704 stored = STORED;
1705 }
1706 }
1707
1708 if (stored == STORED) {
1709 c->cas = ITEM_get_cas(it);
1710 }
1711 LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, NULL,
1712 stored, comm, ITEM_key(it), it->nkey, it->nbytes, it->exptime,
1713 ITEM_clsid(it), c->sfd);
1714
1715 return stored;
1716 }
1717
1718 /* set up a connection to write a buffer then free it, used for stats */
1719 void write_and_free(conn *c, char *buf, int bytes) {
1720 if (buf) {
1721 mc_resp *resp = c->resp;
1722 resp->write_and_free = buf;
1723 resp_add_iov(resp, buf, bytes);
1724 conn_set_state(c, conn_new_cmd);
1725 } else {
1726 out_of_memory(c, "SERVER_ERROR out of memory writing stats");
1727 }
1728 }
1729
1730 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
1731 const char *fmt, ...) {
1732 char val_str[STAT_VAL_LEN];
1733 int vlen;
1734 va_list ap;
1735
1736 assert(name);
1737 assert(add_stats);
1738 assert(c);
1739 assert(fmt);
1740
1741 va_start(ap, fmt);
1742 vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
1743 va_end(ap);
1744
1745 add_stats(name, strlen(name), val_str, vlen, c);
1746 }
1747
1748 /* return server specific stats only */
1749 void server_stats(ADD_STAT add_stats, conn *c) {
1750 pid_t pid = getpid();
1751 rel_time_t now = current_time;
1752
1753 struct thread_stats thread_stats;
1754 threadlocal_stats_aggregate(&thread_stats);
1755 struct slab_stats slab_stats;
1756 slab_stats_aggregate(&thread_stats, &slab_stats);
1757 #ifndef WIN32
1758 struct rusage usage;
1759 getrusage(RUSAGE_SELF, &usage);
1760 #endif /* !WIN32 */
1761
1762 STATS_LOCK();
1763
1764 APPEND_STAT("pid", "%lu", (long)pid);
1765 APPEND_STAT("uptime", "%u", now - ITEM_UPDATE_INTERVAL);
1766 APPEND_STAT("time", "%ld", now + (long)process_started);
1767 APPEND_STAT("version", "%s", VERSION);
1768 APPEND_STAT("libevent", "%s", event_get_version());
1769 APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
1770
1771 #ifndef WIN32
1772 append_stat("rusage_user", add_stats, c, "%ld.%06ld",
1773 (long)usage.ru_utime.tv_sec,
1774 (long)usage.ru_utime.tv_usec);
1775 append_stat("rusage_system", add_stats, c, "%ld.%06ld",
1776 (long)usage.ru_stime.tv_sec,
1777 (long)usage.ru_stime.tv_usec);
1778 #endif /* !WIN32 */
1779
1780 APPEND_STAT("max_connections", "%d", settings.maxconns);
1781 APPEND_STAT("curr_connections", "%llu", (unsigned long long)stats_state.curr_conns - 1);
1782 APPEND_STAT("total_connections", "%llu", (unsigned long long)stats.total_conns);
1783 if (settings.maxconns_fast) {
1784 APPEND_STAT("rejected_connections", "%llu", (unsigned long long)stats.rejected_conns);
1785 }
1786 APPEND_STAT("connection_structures", "%u", stats_state.conn_structs);
1787 APPEND_STAT("response_obj_oom", "%llu", (unsigned long long)thread_stats.response_obj_oom);
1788 APPEND_STAT("response_obj_count", "%llu", (unsigned long long)thread_stats.response_obj_count);
1789 APPEND_STAT("response_obj_bytes", "%llu", (unsigned long long)thread_stats.response_obj_bytes);
1790 APPEND_STAT("read_buf_count", "%llu", (unsigned long long)thread_stats.read_buf_count);
1791 APPEND_STAT("read_buf_bytes", "%llu", (unsigned long long)thread_stats.read_buf_bytes);
1792 APPEND_STAT("read_buf_bytes_free", "%llu", (unsigned long long)thread_stats.read_buf_bytes_free);
1793 APPEND_STAT("read_buf_oom", "%llu", (unsigned long long)thread_stats.read_buf_oom);
1794 APPEND_STAT("reserved_fds", "%u", stats_state.reserved_fds);
1795 #ifdef PROXY
1796 if (settings.proxy_enabled) {
1797 APPEND_STAT("proxy_conn_requests", "%llu", (unsigned long long)thread_stats.proxy_conn_requests);
1798 APPEND_STAT("proxy_conn_errors", "%llu", (unsigned long long)thread_stats.proxy_conn_errors);
1799 APPEND_STAT("proxy_conn_oom", "%llu", (unsigned long long)thread_stats.proxy_conn_oom);
1800 APPEND_STAT("proxy_req_active", "%llu", (unsigned long long)thread_stats.proxy_req_active);
1801 }
1802 #endif
1803 APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
1804 APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
1805 APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
1806 APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats.touch_cmds);
1807 APPEND_STAT("cmd_meta", "%llu", (unsigned long long)thread_stats.meta_cmds);
1808 APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
1809 APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
1810 APPEND_STAT("get_expired", "%llu", (unsigned long long)thread_stats.get_expired);
1811 APPEND_STAT("get_flushed", "%llu", (unsigned long long)thread_stats.get_flushed);
1812 #ifdef EXTSTORE
1813 if (c->thread->storage) {
1814 APPEND_STAT("get_extstore", "%llu", (unsigned long long)thread_stats.get_extstore);
1815 APPEND_STAT("get_aborted_extstore", "%llu", (unsigned long long)thread_stats.get_aborted_extstore);
1816 APPEND_STAT("get_oom_extstore", "%llu", (unsigned long long)thread_stats.get_oom_extstore);
1817 APPEND_STAT("recache_from_extstore", "%llu", (unsigned long long)thread_stats.recache_from_extstore);
1818 APPEND_STAT("miss_from_extstore", "%llu", (unsigned long long)thread_stats.miss_from_extstore);
1819 APPEND_STAT("badcrc_from_extstore", "%llu", (unsigned long long)thread_stats.badcrc_from_extstore);
1820 }
1821 #endif
1822 APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
1823 APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
1824 APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
1825 APPEND_STAT("incr_hits", "%llu", (unsigned long long)slab_stats.incr_hits);
1826 APPEND_STAT("decr_misses", "%llu", (unsigned long long)thread_stats.decr_misses);
1827 APPEND_STAT("decr_hits", "%llu", (unsigned long long)slab_stats.decr_hits);
1828 APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
1829 APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
1830 APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
1831 APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats.touch_hits);
1832 APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats.touch_misses);
1833 APPEND_STAT("store_too_large", "%llu", (unsigned long long)thread_stats.store_too_large);
1834 APPEND_STAT("store_no_memory", "%llu", (unsigned long long)thread_stats.store_no_memory);
1835 APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats.auth_cmds);
1836 APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats.auth_errors);
1837 if (settings.idle_timeout) {
1838 APPEND_STAT("idle_kicks", "%llu", (unsigned long long)thread_stats.idle_kicks);
1839 }
1840 APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
1841 APPEND_STAT("bytes_written", "%llu", (unsigned long long)thread_stats.bytes_written);
1842 APPEND_STAT("limit_maxbytes", "%llu", (unsigned long long)settings.maxbytes);
1843 APPEND_STAT("accepting_conns", "%u", stats_state.accepting_conns);
1844 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
1845 APPEND_STAT("time_in_listen_disabled_us", "%llu", stats.time_in_listen_disabled_us);
1846 APPEND_STAT("threads", "%d", settings.num_threads);
1847 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
1848 APPEND_STAT("hash_power_level", "%u", stats_state.hash_power_level);
1849 APPEND_STAT("hash_bytes", "%llu", (unsigned long long)stats_state.hash_bytes);
1850 APPEND_STAT("hash_is_expanding", "%u", stats_state.hash_is_expanding);
1851 if (settings.slab_reassign) {
1852 APPEND_STAT("slab_reassign_rescues", "%llu", stats.slab_reassign_rescues);
1853 APPEND_STAT("slab_reassign_chunk_rescues", "%llu", stats.slab_reassign_chunk_rescues);
1854 APPEND_STAT("slab_reassign_evictions_nomem", "%llu", stats.slab_reassign_evictions_nomem);
1855 APPEND_STAT("slab_reassign_inline_reclaim", "%llu", stats.slab_reassign_inline_reclaim);
1856 APPEND_STAT("slab_reassign_busy_items", "%llu", stats.slab_reassign_busy_items);
1857 APPEND_STAT("slab_reassign_busy_deletes", "%llu", stats.slab_reassign_busy_deletes);
1858 APPEND_STAT("slab_reassign_running", "%u", stats_state.slab_reassign_running);
1859 APPEND_STAT("slabs_moved", "%llu", stats.slabs_moved);
1860 }
1861 if (settings.lru_crawler) {
1862 APPEND_STAT("lru_crawler_running", "%u", stats_state.lru_crawler_running);
1863 APPEND_STAT("lru_crawler_starts", "%u", stats.lru_crawler_starts);
1864 }
1865 if (settings.lru_maintainer_thread) {
1866 APPEND_STAT("lru_maintainer_juggles", "%llu", (unsigned long long)stats.lru_maintainer_juggles);
1867 }
1868 APPEND_STAT("malloc_fails", "%llu",
1869 (unsigned long long)stats.malloc_fails);
1870 APPEND_STAT("log_worker_dropped", "%llu", (unsigned long long)stats.log_worker_dropped);
1871 APPEND_STAT("log_worker_written", "%llu", (unsigned long long)stats.log_worker_written);
1872 APPEND_STAT("log_watcher_skipped", "%llu", (unsigned long long)stats.log_watcher_skipped);
1873 APPEND_STAT("log_watcher_sent", "%llu", (unsigned long long)stats.log_watcher_sent);
1874 APPEND_STAT("log_watchers", "%llu", (unsigned long long)stats_state.log_watchers);
1875 STATS_UNLOCK();
1876 #ifdef EXTSTORE
1877 storage_stats(add_stats, c);
1878 #endif
1879 #ifdef PROXY
1880 proxy_stats(add_stats, c);
1881 #endif
1882 #ifdef TLS
1883 if (settings.ssl_enabled) {
1884 if (settings.ssl_session_cache) {
1885 APPEND_STAT("ssl_new_sessions", "%llu", (unsigned long long)stats.ssl_new_sessions);
1886 }
1887 APPEND_STAT("ssl_handshake_errors", "%llu", (unsigned long long)stats.ssl_handshake_errors);
1888 APPEND_STAT("time_since_server_cert_refresh", "%u", now - settings.ssl_last_cert_refresh_time);
1889 }
1890 #endif
1891 APPEND_STAT("unexpected_napi_ids", "%llu", (unsigned long long)stats.unexpected_napi_ids);
1892 APPEND_STAT("round_robin_fallback", "%llu", (unsigned long long)stats.round_robin_fallback);
1893 }
1894
1895 void process_stat_settings(ADD_STAT add_stats, void *c) {
1896 assert(add_stats);
1897 APPEND_STAT("maxbytes", "%llu", (unsigned long long)settings.maxbytes);
1898 APPEND_STAT("maxconns", "%d", settings.maxconns);
1899 APPEND_STAT("tcpport", "%d", settings.port);
1900 APPEND_STAT("udpport", "%d", settings.udpport);
1901 APPEND_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
1902 APPEND_STAT("verbosity", "%d", settings.verbose);
1903 APPEND_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
1904 APPEND_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
1905 APPEND_STAT("domain_socket", "%s",
1906 settings.socketpath ? settings.socketpath : "NULL");
1907 APPEND_STAT("umask", "%o", settings.access);
1908 APPEND_STAT("shutdown_command", "%s",
1909 settings.shutdown_command ? "yes" : "no");
1910 APPEND_STAT("growth_factor", "%.2f", settings.factor);
1911 APPEND_STAT("chunk_size", "%d", settings.chunk_size);
1912 APPEND_STAT("num_threads", "%d", settings.num_threads);
1913 APPEND_STAT("num_threads_per_udp", "%d", settings.num_threads_per_udp);
1914 APPEND_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
1915 APPEND_STAT("detail_enabled", "%s",
1916 settings.detail_enabled ? "yes" : "no");
1917 APPEND_STAT("reqs_per_event", "%d", settings.reqs_per_event);
1918 APPEND_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
1919 APPEND_STAT("tcp_backlog", "%d", settings.backlog);
1920 APPEND_STAT("binding_protocol", "%s",
1921 prot_text(settings.binding_protocol));
1922 APPEND_STAT("auth_enabled_sasl", "%s", settings.sasl ? "yes" : "no");
1923 APPEND_STAT("auth_enabled_ascii", "%s", settings.auth_file ? settings.auth_file : "no");
1924 APPEND_STAT("item_size_max", "%d", settings.item_size_max);
1925 APPEND_STAT("maxconns_fast", "%s", settings.maxconns_fast ? "yes" : "no");
1926 APPEND_STAT("hashpower_init", "%d", settings.hashpower_init);
1927 APPEND_STAT("slab_reassign", "%s", settings.slab_reassign ? "yes" : "no");
1928 APPEND_STAT("slab_automove", "%d", settings.slab_automove);
1929 APPEND_STAT("slab_automove_ratio", "%.2f", settings.slab_automove_ratio);
1930 APPEND_STAT("slab_automove_window", "%u", settings.slab_automove_window);
1931 APPEND_STAT("slab_chunk_max", "%d", settings.slab_chunk_size_max);
1932 APPEND_STAT("lru_crawler", "%s", settings.lru_crawler ? "yes" : "no");
1933 APPEND_STAT("lru_crawler_sleep", "%d", settings.lru_crawler_sleep);
1934 APPEND_STAT("lru_crawler_tocrawl", "%lu", (unsigned long)settings.lru_crawler_tocrawl);
1935 APPEND_STAT("tail_repair_time", "%d", settings.tail_repair_time);
1936 APPEND_STAT("flush_enabled", "%s", settings.flush_enabled ? "yes" : "no");
1937 APPEND_STAT("dump_enabled", "%s", settings.dump_enabled ? "yes" : "no");
1938 APPEND_STAT("hash_algorithm", "%s", settings.hash_algorithm);
1939 APPEND_STAT("lru_maintainer_thread", "%s", settings.lru_maintainer_thread ? "yes" : "no");
1940 APPEND_STAT("lru_segmented", "%s", settings.lru_segmented ? "yes" : "no");
1941 APPEND_STAT("hot_lru_pct", "%d", settings.hot_lru_pct);
1942 APPEND_STAT("warm_lru_pct", "%d", settings.warm_lru_pct);
1943 APPEND_STAT("hot_max_factor", "%.2f", settings.hot_max_factor);
1944 APPEND_STAT("warm_max_factor", "%.2f", settings.warm_max_factor);
1945 APPEND_STAT("temp_lru", "%s", settings.temp_lru ? "yes" : "no");
1946 APPEND_STAT("temporary_ttl", "%u", settings.temporary_ttl);
1947 APPEND_STAT("idle_timeout", "%d", settings.idle_timeout);
1948 APPEND_STAT("watcher_logbuf_size", "%u", settings.logger_watcher_buf_size);
1949 APPEND_STAT("worker_logbuf_size", "%u", settings.logger_buf_size);
1950 APPEND_STAT("read_buf_mem_limit", "%u", settings.read_buf_mem_limit);
1951 APPEND_STAT("track_sizes", "%s", item_stats_sizes_status() ? "yes" : "no");
1952 APPEND_STAT("inline_ascii_response", "%s", "no"); // setting is dead, cannot be yes.
1953 #ifdef HAVE_DROP_PRIVILEGES
1954 APPEND_STAT("drop_privileges", "%s", settings.drop_privileges ? "yes" : "no");
1955 #endif
1956 #ifdef EXTSTORE
1957 APPEND_STAT("ext_item_size", "%u", settings.ext_item_size);
1958 APPEND_STAT("ext_item_age", "%u", settings.ext_item_age);
1959 APPEND_STAT("ext_low_ttl", "%u", settings.ext_low_ttl);
1960 APPEND_STAT("ext_recache_rate", "%u", settings.ext_recache_rate);
1961 APPEND_STAT("ext_wbuf_size", "%u", settings.ext_wbuf_size);
1962 APPEND_STAT("ext_compact_under", "%u", settings.ext_compact_under);
1963 APPEND_STAT("ext_drop_under", "%u", settings.ext_drop_under);
1964 APPEND_STAT("ext_max_sleep", "%u", settings.ext_max_sleep);
1965 APPEND_STAT("ext_max_frag", "%.2f", settings.ext_max_frag);
1966 APPEND_STAT("slab_automove_freeratio", "%.3f", settings.slab_automove_freeratio);
1967 APPEND_STAT("ext_drop_unread", "%s", settings.ext_drop_unread ? "yes" : "no");
1968 #endif
1969 #ifdef TLS
1970 APPEND_STAT("ssl_enabled", "%s", settings.ssl_enabled ? "yes" : "no");
1971 APPEND_STAT("ssl_chain_cert", "%s", settings.ssl_chain_cert);
1972 APPEND_STAT("ssl_key", "%s", settings.ssl_key);
1973 APPEND_STAT("ssl_verify_mode", "%d", settings.ssl_verify_mode);
1974 APPEND_STAT("ssl_keyformat", "%d", settings.ssl_keyformat);
1975 APPEND_STAT("ssl_ciphers", "%s", settings.ssl_ciphers ? settings.ssl_ciphers : "NULL");
1976 APPEND_STAT("ssl_ca_cert", "%s", settings.ssl_ca_cert ? settings.ssl_ca_cert : "NULL");
1977 APPEND_STAT("ssl_wbuf_size", "%u", settings.ssl_wbuf_size);
1978 APPEND_STAT("ssl_session_cache", "%s", settings.ssl_session_cache ? "yes" : "no");
1979 APPEND_STAT("ssl_min_version", "%s", ssl_proto_text(settings.ssl_min_version));
1980 #endif
1981 #ifdef PROXY
1982 APPEND_STAT("proxy_enabled", "%s", settings.proxy_enabled ? "yes" : "no");
1983 APPEND_STAT("proxy_uring_enabled", "%s", settings.proxy_uring ? "yes" : "no");
1984 #endif
1985 APPEND_STAT("num_napi_ids", "%s", settings.num_napi_ids);
1986 APPEND_STAT("memory_file", "%s", settings.memory_file);
1987 }
1988
1989 static int nz_strcmp(int nzlength, const char *nz, const char *z) {
1990 int zlength=strlen(z);
1991 return (zlength == nzlength) && (strncmp(nz, z, zlength) == 0) ? 0 : -1;
1992 }
1993
1994 bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c) {
1995 bool ret = true;
1996
1997 if (add_stats != NULL) {
1998 if (!stat_type) {
1999 /* prepare general statistics for the engine */
2000 STATS_LOCK();
2001 APPEND_STAT("bytes", "%llu", (unsigned long long)stats_state.curr_bytes);
2002 APPEND_STAT("curr_items", "%llu", (unsigned long long)stats_state.curr_items);
2003 APPEND_STAT("total_items", "%llu", (unsigned long long)stats.total_items);
2004 STATS_UNLOCK();
2005 APPEND_STAT("slab_global_page_pool", "%u", global_page_pool_size(NULL));
2006 item_stats_totals(add_stats, c);
2007 } else if (nz_strcmp(nkey, stat_type, "items") == 0) {
2008 item_stats(add_stats, c);
2009 } else if (nz_strcmp(nkey, stat_type, "slabs") == 0) {
2010 slabs_stats(add_stats, c);
2011 } else if (nz_strcmp(nkey, stat_type, "sizes") == 0) {
2012 item_stats_sizes(add_stats, c);
2013 } else if (nz_strcmp(nkey, stat_type, "sizes_enable") == 0) {
2014 item_stats_sizes_enable(add_stats, c);
2015 } else if (nz_strcmp(nkey, stat_type, "sizes_disable") == 0) {
2016 item_stats_sizes_disable(add_stats, c);
2017 } else {
2018 ret = false;
2019 }
2020 } else {
2021 ret = false;
2022 }
2023
2024 return ret;
2025 }
2026
2027 static inline void get_conn_text(const conn *c, const int af,
2028 char* addr, struct sockaddr *sock_addr) {
2029 char addr_text[MAXPATHLEN];
2030 addr_text[0] = '\0';
2031 const char *protoname = "?";
2032 unsigned short port = 0;
2033
2034 switch (af) {
2035 case AF_INET:
2036 (void) inet_ntop(af,
2037 &((struct sockaddr_in *)sock_addr)->sin_addr,
2038 addr_text,
2039 sizeof(addr_text) - 1);
2040 port = ntohs(((struct sockaddr_in *)sock_addr)->sin_port);
2041 protoname = IS_UDP(c->transport) ? "udp" : "tcp";
2042 break;
2043
2044 case AF_INET6:
2045 addr_text[0] = '[';
2046 addr_text[1] = '\0';
2047 if (inet_ntop(af,
2048 &((struct sockaddr_in6 *)sock_addr)->sin6_addr,
2049 addr_text + 1,
2050 sizeof(addr_text) - 2)) {
2051 strcat(addr_text, "]");
2052 }
2053 port = ntohs(((struct sockaddr_in6 *)sock_addr)->sin6_port);
2054 protoname = IS_UDP(c->transport) ? "udp6" : "tcp6";
2055 break;
2056
2057 #ifndef DISABLE_UNIX_SOCKET
2058 case AF_UNIX:
2059 {
2060 size_t pathlen = 0;
2061 // this strncpy call originally could piss off an address
2062 // sanitizer; we supplied the size of the dest buf as a limiter,
2063 // but optimized versions of strncpy could read past the end of
2064 // *src while looking for a null terminator. Since buf and
2065 // sun_path here are both on the stack they could even overlap,
2066 // which is "undefined". In all OSS versions of strncpy I could
2067 // find this has no effect; it'll still only copy until the first null
2068 // terminator is found. Thus it's possible to get the OS to
2069 // examine past the end of sun_path but it's unclear to me if this
2070 // can cause any actual problem.
2071 //
2072 // We need a safe_strncpy util function but I'll punt on figuring
2073 // that out for now.
2074 pathlen = sizeof(((struct sockaddr_un *)sock_addr)->sun_path);
2075 if (MAXPATHLEN <= pathlen) {
2076 pathlen = MAXPATHLEN - 1;
2077 }
2078 strncpy(addr_text,
2079 ((struct sockaddr_un *)sock_addr)->sun_path,
2080 pathlen);
2081 addr_text[pathlen] = '\0';
2082 protoname = "unix";
2083 }
2084 break;
2085 #endif /* #ifndef DISABLE_UNIX_SOCKET */
2086 }
2087
2088 if (strlen(addr_text) < 2) {
2089 /* Most likely this is a connected UNIX-domain client which
2090 * has no peer socket address, but there's no portable way
2091 * to tell for sure.
2092 */
2093 sprintf(addr_text, "<AF %d>", af);
2094 }
2095
2096 if (port) {
2097 sprintf(addr, "%s:%s:%u", protoname, addr_text, port);
2098 } else {
2099 sprintf(addr, "%s:%s", protoname, addr_text);
2100 }
2101 }
2102
2103 static void conn_to_str(const conn *c, char *addr, char *svr_addr) {
2104 if (!c) {
2105 strcpy(addr, "<null>");
2106 } else if (c->state == conn_closed) {
2107 strcpy(addr, "<closed>");
2108 } else {
2109 struct sockaddr_in6 local_addr;
2110 struct sockaddr *sock_addr = (void *)&c->request_addr;
2111
2112 /* For listen ports and idle UDP ports, show listen address */
2113 if (c->state == conn_listening ||
2114 (IS_UDP(c->transport) &&
2115 c->state == conn_read)) {
2116 socklen_t local_addr_len = sizeof(local_addr);
2117
2118 if (getsockname(c->sfd,
2119 (struct sockaddr *)&local_addr,
2120 &local_addr_len) == 0) {
2121 sock_addr = (struct sockaddr *)&local_addr;
2122 }
2123 }
2124 get_conn_text(c, sock_addr->sa_family, addr, sock_addr);
2125
2126 if (c->state != conn_listening && !(IS_UDP(c->transport) &&
2127 c->state == conn_read)) {
2128 struct sockaddr_storage svr_sock_addr;
2129 socklen_t svr_addr_len = sizeof(svr_sock_addr);
2130 getsockname(c->sfd, (struct sockaddr *)&svr_sock_addr, &svr_addr_len);
2131 get_conn_text(c, svr_sock_addr.ss_family, svr_addr, (struct sockaddr *)&svr_sock_addr);
2132 }
2133 }
2134 }
2135
2136 void process_stats_conns(ADD_STAT add_stats, void *c) {
2137 int i;
2138 char key_str[STAT_KEY_LEN];
2139 char val_str[STAT_VAL_LEN];
2140 size_t extras_len = sizeof("unix:") + sizeof("65535");
2141 char addr[MAXPATHLEN + extras_len];
2142 char svr_addr[MAXPATHLEN + extras_len];
2143 int klen = 0, vlen = 0;
2144
2145 assert(add_stats);
2146
2147 for (i = 0; i < max_fds; i++) {
2148 if (conns[i]) {
2149 /* This is safe to do unlocked because conns are never freed; the
2150 * worst that'll happen will be a minor inconsistency in the
2151 * output -- not worth the complexity of the locking that'd be
2152 * required to prevent it.
2153 */
2154 if (IS_UDP(conns[i]->transport)) {
2155 APPEND_NUM_STAT(i, "UDP", "%s", "UDP");
2156 }
2157 if (conns[i]->state != conn_closed) {
2158 conn_to_str(conns[i], addr, svr_addr);
2159
2160 APPEND_NUM_STAT(i, "addr", "%s", addr);
2161 if (conns[i]->state != conn_listening &&
2162 !(IS_UDP(conns[i]->transport) && conns[i]->state == conn_read)) {
2163 APPEND_NUM_STAT(i, "listen_addr", "%s", svr_addr);
2164 }
2165 APPEND_NUM_STAT(i, "state", "%s",
2166 state_text(conns[i]->state));
2167 APPEND_NUM_STAT(i, "secs_since_last_cmd", "%d",
2168 current_time - conns[i]->last_cmd_time);
2169 }
2170 }
2171 }
2172 }
2173
2174 #define IT_REFCOUNT_LIMIT 60000
2175 item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should_touch, bool do_update, bool *overflow) {
2176 item *it;
2177 if (should_touch) {
2178 it = item_touch(key, nkey, exptime, c);
2179 } else {
2180 it = item_get(key, nkey, c, do_update);
2181 }
2182 if (it && it->refcount > IT_REFCOUNT_LIMIT) {
2183 item_remove(it);
2184 it = NULL;
2185 *overflow = true;
2186 } else {
2187 *overflow = false;
2188 }
2189 return it;
2190 }
2191
2192 // Semantics are different than limited_get; since the item is returned
2193 // locked, caller can directly change what it needs.
2194 // though it might eventually be a better interface to sink it all into
2195 // items.c.
2196 item* limited_get_locked(char *key, size_t nkey, conn *c, bool do_update, uint32_t *hv, bool *overflow) {
2197 item *it;
2198 it = item_get_locked(key, nkey, c, do_update, hv);
2199 if (it && it->refcount > IT_REFCOUNT_LIMIT) {
2200 do_item_remove(it);
2201 it = NULL;
2202 item_unlock(*hv);
2203 *overflow = true;
2204 } else {
2205 *overflow = false;
2206 }
2207 return it;
2208 }
2209
2210 /*
2211 * adds a delta value to a numeric item.
2212 *
2213 * c connection requesting the operation
2214 * it item to adjust
2215 * incr true to increment value, false to decrement
2216 * delta amount to adjust value by
2217 * buf buffer for response string
2218 *
2219 * returns a response string to send back to the client.
2220 */
2221 enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
2222 const bool incr, const int64_t delta,
2223 char *buf, uint64_t *cas,
2224 const uint32_t hv,
2225 item **it_ret) {
2226 char *ptr;
2227 uint64_t value;
2228 int res;
2229 item *it;
2230
2231 it = do_item_get(key, nkey, hv, c, DONT_UPDATE);
2232 if (!it) {
2233 return DELTA_ITEM_NOT_FOUND;
2234 }
2235
2236 /* Can't delta zero byte values. 2-byte are the "\r\n" */
2237 /* Also can't delta for chunked items. Too large to be a number */
2238 #ifdef EXTSTORE
2239 if (it->nbytes <= 2 || (it->it_flags & (ITEM_CHUNKED|ITEM_HDR)) != 0) {
2240 #else
2241 if (it->nbytes <= 2 || (it->it_flags & (ITEM_CHUNKED)) != 0) {
2242 #endif
2243 do_item_remove(it);
2244 return NON_NUMERIC;
2245 }
2246
2247 if (cas != NULL && *cas != 0 && ITEM_get_cas(it) != *cas) {
2248 do_item_remove(it);
2249 return DELTA_ITEM_CAS_MISMATCH;
2250 }
2251
2252 ptr = ITEM_data(it);
2253
2254 if (!safe_strtoull(ptr, &value)) {
2255 do_item_remove(it);
2256 return NON_NUMERIC;
2257 }
2258
2259 if (incr) {
2260 value += delta;
2261 MEMCACHED_COMMAND_INCR(c->sfd, ITEM_key(it), it->nkey, value);
2262 } else {
2263 if(delta > value) {
2264 value = 0;
2265 } else {
2266 value -= delta;
2267 }
2268 MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value);
2269 }
2270
2271 pthread_mutex_lock(&c->thread->stats.mutex);
2272 if (incr) {
2273 c->thread->stats.slab_stats[ITEM_clsid(it)].incr_hits++;
2274 } else {
2275 c->thread->stats.slab_stats[ITEM_clsid(it)].decr_hits++;
2276 }
2277 pthread_mutex_unlock(&c->thread->stats.mutex);
2278
2279 itoa_u64(value, buf);
2280 res = strlen(buf);
2281 /* refcount == 2 means we are the only ones holding the item, and it is
2282 * linked. We hold the item's lock in this function, so refcount cannot
2283 * increase. */
2284 if (res + 2 <= it->nbytes && it->refcount == 2) { /* replace in-place */
2285 /* When changing the value without replacing the item, we
2286 need to update the CAS on the existing item. */
2287 /* We also need to fiddle it in the sizes tracker in case the tracking
2288 * was enabled at runtime, since it relies on the CAS value to know
2289 * whether to remove an item or not. */
2290 item_stats_sizes_remove(it);
2291 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
2292 item_stats_sizes_add(it);
2293 memcpy(ITEM_data(it), buf, res);
2294 memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
2295 do_item_update(it);
2296 } else if (it->refcount > 1) {
2297 item *new_it;
2298 uint32_t flags;
2299 FLAGS_CONV(it, flags);
2300 new_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, res + 2);
2301 if (new_it == 0) {
2302 do_item_remove(it);
2303 return EOM;
2304 }
2305 memcpy(ITEM_data(new_it), buf, res);
2306 memcpy(ITEM_data(new_it) + res, "\r\n", 2);
2307 item_replace(it, new_it, hv);
2308 // Overwrite the older item's CAS with our new CAS since we're
2309 // returning the CAS of the old item below.
2310 ITEM_set_cas(it, (settings.use_cas) ? ITEM_get_cas(new_it) : 0);
2311 do_item_remove(new_it); /* release our reference */
2312 } else {
2313 /* Should never get here. This means we somehow fetched an unlinked
2314 * item. TODO: Add a counter? */
2315 if (settings.verbose) {
2316 fprintf(stderr, "Tried to do incr/decr on invalid item\n");
2317 }
2318 if (it->refcount == 1)
2319 do_item_remove(it);
2320 return DELTA_ITEM_NOT_FOUND;
2321 }
2322
2323 if (cas) {
2324 *cas = ITEM_get_cas(it); /* swap the incoming CAS value */
2325 }
2326 if (it_ret != NULL) {
2327 *it_ret = it;
2328 } else {
2329 do_item_remove(it); /* release our reference */
2330 }
2331 return OK;
2332 }
2333
2334 static int try_read_command_negotiate(conn *c) {
2335 assert(c != NULL);
2336 assert(c->protocol == negotiating_prot);
2337 assert(c->rcurr <= (c->rbuf + c->rsize));
2338 assert(c->rbytes > 0);
2339
2340 if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
2341 c->protocol = binary_prot;
2342 c->try_read_command = try_read_command_binary;
2343 } else {
2344 // authentication doesn't work with negotiated protocol.
2345 c->protocol = ascii_prot;
2346 c->try_read_command = try_read_command_ascii;
2347 }
2348
2349 if (settings.verbose > 1) {
2350 fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
2351 prot_text(c->protocol));
2352 }
2353
2354 return c->try_read_command(c);
2355 }
2356
2357 static int try_read_command_udp(conn *c) {
2358 assert(c != NULL);
2359 assert(c->rcurr <= (c->rbuf + c->rsize));
2360 assert(c->rbytes > 0);
2361
2362 if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
2363 c->protocol = binary_prot;
2364 return try_read_command_binary(c);
2365 } else {
2366 c->protocol = ascii_prot;
2367 return try_read_command_ascii(c);
2368 }
2369 }
2370
2371 /*
2372 * read a UDP request.
2373 */
2374 static enum try_read_result try_read_udp(conn *c) {
2375 int res;
2376
2377 assert(c != NULL);
2378
2379 c->request_addr_size = sizeof(c->request_addr);
2380 res = recvfrom(c->sfd, c->rbuf, c->rsize,
2381 0, (struct sockaddr *)&c->request_addr,
2382 &c->request_addr_size);
2383 if (res > 8) {
2384 unsigned char *buf = (unsigned char *)c->rbuf;
2385 pthread_mutex_lock(&c->thread->stats.mutex);
2386 c->thread->stats.bytes_read += res;
2387 pthread_mutex_unlock(&c->thread->stats.mutex);
2388
2389 /* Beginning of UDP packet is the request ID; save it. */
2390 c->request_id = buf[0] * 256 + buf[1];
2391
2392 /* If this is a multi-packet request, drop it. */
2393 if (buf[4] != 0 || buf[5] != 1) {
2394 return READ_NO_DATA_RECEIVED;
2395 }
2396
2397 /* Don't care about any of the rest of the header. */
2398 res -= 8;
2399 memmove(c->rbuf, c->rbuf + 8, res);
2400
2401 c->rbytes = res;
2402 c->rcurr = c->rbuf;
2403 return READ_DATA_RECEIVED;
2404 }
2405 return READ_NO_DATA_RECEIVED;
2406 }
2407
2408 /*
2409 * read from network as much as we can, handle buffer overflow and connection
2410 * close.
2411 * before reading, move the remaining incomplete fragment of a command
2412 * (if any) to the beginning of the buffer.
2413 *
2414 * To protect us from someone flooding a connection with bogus data causing
2415 * the connection to eat up all available memory, break out and start looking
2416 * at the data I've got after a number of reallocs...
2417 *
2418 * @return enum try_read_result
2419 */
2420 static enum try_read_result try_read_network(conn *c) {
2421 enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
2422 int res;
2423 int num_allocs = 0;
2424 assert(c != NULL);
2425
2426 if (c->rcurr != c->rbuf) {
2427 if (c->rbytes != 0) /* otherwise there's nothing to copy */
2428 memmove(c->rbuf, c->rcurr, c->rbytes);
2429 c->rcurr = c->rbuf;
2430 }
2431
2432 while (1) {
2433 // TODO: move to rbuf_* func?
2434 if (c->rbytes >= c->rsize && c->rbuf_malloced) {
2435 if (num_allocs == 4) {
2436 return gotdata;
2437 }
2438 ++num_allocs;
2439 char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
2440 if (!new_rbuf) {
2441 STATS_LOCK();
2442 stats.malloc_fails++;
2443 STATS_UNLOCK();
2444 if (settings.verbose > 0) {
2445 fprintf(stderr, "Couldn't realloc input buffer\n");
2446 }
2447 c->rbytes = 0; /* ignore what we read */
2448 out_of_memory(c, "SERVER_ERROR out of memory reading request");
2449 c->close_after_write = true;
2450 return READ_MEMORY_ERROR;
2451 }
2452 c->rcurr = c->rbuf = new_rbuf;
2453 c->rsize *= 2;
2454 }
2455
2456 int avail = c->rsize - c->rbytes;
2457 res = c->read(c, c->rbuf + c->rbytes, avail);
2458 if (res > 0) {
2459 pthread_mutex_lock(&c->thread->stats.mutex);
2460 c->thread->stats.bytes_read += res;
2461 pthread_mutex_unlock(&c->thread->stats.mutex);
2462 gotdata = READ_DATA_RECEIVED;
2463 c->rbytes += res;
2464 if (res == avail && c->rbuf_malloced) {
2465 // Resize rbuf and try a few times if huge ascii multiget.
2466 continue;
2467 } else {
2468 break;
2469 }
2470 }
2471 if (res == 0) {
2472 c->close_reason = NORMAL_CLOSE;
2473 return READ_ERROR;
2474 }
2475 if (res == -1) {
2476 if (errno == EAGAIN || errno == EWOULDBLOCK) {
2477 break;
2478 }
2479 return READ_ERROR;
2480 }
2481 }
2482 return gotdata;
2483 }
2484
2485 static bool update_event(conn *c, const int new_flags) {
2486 assert(c != NULL);
2487
2488 struct event_base *base = c->event.ev_base;
2489 if (c->ev_flags == new_flags)
2490 return true;
2491 if (event_del(&c->event) == -1) return false;
2492 event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
2493 event_base_set(base, &c->event);
2494 c->ev_flags = new_flags;
2495 if (event_add(&c->event, 0) == -1) return false;
2496 return true;
2497 }
2498
2499 /*
2500 * Sets whether we are listening for new connections or not.
2501 */
2502 void do_accept_new_conns(const bool do_accept) {
2503 conn *next;
2504
2505 for (next = listen_conn; next; next = next->next) {
2506 if (do_accept) {
2507 update_event(next, EV_READ | EV_PERSIST);
2508 if (listen(next->sfd, settings.backlog) != 0) {
2509 perror("listen");
2510 }
2511 }
2512 else {
2513 update_event(next, 0);
2514 if (listen(next->sfd, 0) != 0) {
2515 perror("listen");
2516 }
2517 }
2518 }
2519
2520 if (do_accept) {
2521 struct timeval maxconns_exited;
2522 uint64_t elapsed_us;
2523 gettimeofday(&maxconns_exited,NULL);
2524 STATS_LOCK();
2525 elapsed_us =
2526 (maxconns_exited.tv_sec - stats.maxconns_entered.tv_sec) * 1000000
2527 + (maxconns_exited.tv_usec - stats.maxconns_entered.tv_usec);
2528 stats.time_in_listen_disabled_us += elapsed_us;
2529 stats_state.accepting_conns = true;
2530 STATS_UNLOCK();
2531 } else {
2532 STATS_LOCK();
2533 stats_state.accepting_conns = false;
2534 gettimeofday(&stats.maxconns_entered,NULL);
2535 stats.listen_disabled_num++;
2536 STATS_UNLOCK();
2537 allow_new_conns = false;
2538 maxconns_handler(-42, 0, 0);
2539 }
2540 }
2541
2542 #define TRANSMIT_ONE_RESP true
2543 #define TRANSMIT_ALL_RESP false
2544 static int _transmit_pre(conn *c, struct iovec *iovs, int iovused, bool one_resp) {
2545 mc_resp *resp = c->resp_head;
2546 while (resp && iovused + resp->iovcnt < IOV_MAX-1) {
2547 if (resp->skip) {
2548 // Don't actually unchain the resp obj here since it's singly-linked.
2549 // Just let the post function handle it linearly.
2550 resp = resp->next;
2551 continue;
2552 }
2553 if (resp->chunked_data_iov) {
2554 // Handle chunked items specially.
2555 // They spend much more time in send so we can be a bit wasteful
2556 // in rebuilding iovecs for them.
2557 item_chunk *ch = (item_chunk *)ITEM_schunk((item *)resp->iov[resp->chunked_data_iov].iov_base);
2558 int x;
2559 for (x = 0; x < resp->iovcnt; x++) {
2560 // This iov is tracking how far we've copied so far.
2561 if (x == resp->chunked_data_iov) {
2562 int done = resp->chunked_total - resp->iov[x].iov_len;
2563 // Start from the len to allow binprot to cut the \r\n
2564 int todo = resp->iov[x].iov_len;
2565 while (ch && todo > 0 && iovused < IOV_MAX-1) {
2566 int skip = 0;
2567 if (!ch->used) {
2568 ch = ch->next;
2569 continue;
2570 }
2571 // Skip parts we've already sent.
2572 if (done >= ch->used) {
2573 done -= ch->used;
2574 ch = ch->next;
2575 continue;
2576 } else if (done) {
2577 skip = done;
2578 done = 0;
2579 }
2580 iovs[iovused].iov_base = ch->data + skip;
2581 // Stupid binary protocol makes this go negative.
2582 iovs[iovused].iov_len = ch->used - skip > todo ? todo : ch->used - skip;
2583 iovused++;
2584 todo -= ch->used - skip;
2585 ch = ch->next;
2586 }
2587 } else {
2588 iovs[iovused].iov_base = resp->iov[x].iov_base;
2589 iovs[iovused].iov_len = resp->iov[x].iov_len;
2590 iovused++;
2591 }
2592 if (iovused >= IOV_MAX-1)
2593 break;
2594 }
2595 } else {
2596 memcpy(&iovs[iovused], resp->iov, sizeof(struct iovec)*resp->iovcnt);
2597 iovused += resp->iovcnt;
2598 }
2599
2600 // done looking at first response, walk down the chain.
2601 resp = resp->next;
2602 // used for UDP mode: UDP cannot send multiple responses per packet.
2603 if (one_resp)
2604 break;
2605 }
2606 return iovused;
2607 }
2608
2609 /*
2610 * Decrements and completes responses based on how much data was transmitted.
2611 * Takes the connection and current result bytes.
2612 */
2613 static void _transmit_post(conn *c, ssize_t res) {
2614 // We've written some of the data. Remove the completed
2615 // responses from the list of pending writes.
2616 mc_resp *resp = c->resp_head;
2617 while (resp) {
2618 int x;
2619 if (resp->skip) {
2620 resp = resp_finish(c, resp);
2621 continue;
2622 }
2623
2624 // fastpath check. all small responses should cut here.
2625 if (res >= resp->tosend) {
2626 res -= resp->tosend;
2627 resp = resp_finish(c, resp);
2628 continue;
2629 }
2630
2631 // it's fine to re-check iov's that were zeroed out before.
2632 for (x = 0; x < resp->iovcnt; x++) {
2633 struct iovec *iov = &resp->iov[x];
2634 if (res >= iov->iov_len) {
2635 resp->tosend -= iov->iov_len;
2636 res -= iov->iov_len;
2637 iov->iov_len = 0;
2638 } else {
2639 // Dumb special case for chunked items. Currently tracking
2640 // where to inject the chunked item via iov_base.
2641 // Extra not-great since chunked items can't be the first
2642 // index, so we have to check for non-zero c_d_iov first.
2643 if (!resp->chunked_data_iov || x != resp->chunked_data_iov) {
2644 iov->iov_base = (char *)iov->iov_base + res;
2645 }
2646 iov->iov_len -= res;
2647 resp->tosend -= res;
2648 res = 0;
2649 break;
2650 }
2651 }
2652
2653 // are we done with this response object?
2654 if (resp->tosend == 0) {
2655 resp = resp_finish(c, resp);
2656 } else {
2657 // Jammed up here. This is the new head.
2658 break;
2659 }
2660 }
2661 }
2662
2663 /*
2664 * Transmit the next chunk of data from our list of msgbuf structures.
2665 *
2666 * Returns:
2667 * TRANSMIT_COMPLETE All done writing.
2668 * TRANSMIT_INCOMPLETE More data remaining to write.
2669 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2670 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2671 */
2672 static enum transmit_result transmit(conn *c) {
2673 assert(c != NULL);
2674 struct iovec iovs[IOV_MAX];
2675 struct msghdr msg;
2676 int iovused = 0;
2677
2678 // init the msg.
2679 memset(&msg, 0, sizeof(struct msghdr));
2680 msg.msg_iov = iovs;
2681
2682 iovused = _transmit_pre(c, iovs, iovused, TRANSMIT_ALL_RESP);
2683 if (iovused == 0) {
2684 // Avoid the syscall if we're only handling a noreply.
2685 // Return the response object.
2686 _transmit_post(c, 0);
2687 return TRANSMIT_COMPLETE;
2688 }
2689
2690 // Alright, send.
2691 ssize_t res;
2692 msg.msg_iovlen = iovused;
2693 res = c->sendmsg(c, &msg, 0);
2694 if (res >= 0) {
2695 pthread_mutex_lock(&c->thread->stats.mutex);
2696 c->thread->stats.bytes_written += res;
2697 pthread_mutex_unlock(&c->thread->stats.mutex);
2698
2699 // Decrement any partial IOV's and complete any finished resp's.
2700 _transmit_post(c, res);
2701
2702 if (c->resp_head) {
2703 return TRANSMIT_INCOMPLETE;
2704 } else {
2705 return TRANSMIT_COMPLETE;
2706 }
2707 }
2708
2709 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2710 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2711 if (settings.verbose > 0)
2712 fprintf(stderr, "Couldn't update event\n");
2713 conn_set_state(c, conn_closing);
2714 return TRANSMIT_HARD_ERROR;
2715 }
2716 return TRANSMIT_SOFT_ERROR;
2717 }
2718 /* if res == -1 and error is not EAGAIN or EWOULDBLOCK,
2719 we have a real error, on which we close the connection */
2720 if (settings.verbose > 0)
2721 perror("Failed to write, and not due to blocking");
2722
2723 conn_set_state(c, conn_closing);
2724 return TRANSMIT_HARD_ERROR;
2725 }
2726
2727 static void build_udp_header(unsigned char *hdr, mc_resp *resp) {
2728 // We need to communicate the total number of packets
2729 // If this isn't set, it's the first time this response is building a udp
2730 // header, so "tosend" must be static.
2731 if (!resp->udp_total) {
2732 uint32_t total;
2733 total = resp->tosend / UDP_DATA_SIZE;
2734 if (resp->tosend % UDP_DATA_SIZE)
2735 total++;
2736 // The spec doesn't really say what we should do here. It's _probably_
2737 // better to bail out?
2738 if (total > USHRT_MAX) {
2739 total = USHRT_MAX;
2740 }
2741 resp->udp_total = total;
2742 }
2743
2744 // TODO: why wasn't this hto*'s and casts?
2745 // this ends up sending UDP hdr data specifically in host byte order.
2746 *hdr++ = resp->request_id / 256;
2747 *hdr++ = resp->request_id % 256;
2748 *hdr++ = resp->udp_sequence / 256;
2749 *hdr++ = resp->udp_sequence % 256;
2750 *hdr++ = resp->udp_total / 256;
2751 *hdr++ = resp->udp_total % 256;
2752 *hdr++ = 0;
2753 *hdr++ = 0;
2754 resp->udp_sequence++;
2755 }
2756
2757 /*
2758 * UDP specific transmit function. Uses its own function rather than check
2759 * IS_UDP() five times. If we ever implement sendmmsg or similar support they
2760 * will diverge even more.
2761 * Does not use TLS.
2762 *
2763 * Returns:
2764 * TRANSMIT_COMPLETE All done writing.
2765 * TRANSMIT_INCOMPLETE More data remaining to write.
2766 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2767 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2768 */
2769 static enum transmit_result transmit_udp(conn *c) {
2770 assert(c != NULL);
2771 struct iovec iovs[IOV_MAX];
2772 struct msghdr msg;
2773 mc_resp *resp;
2774 int iovused = 0;
2775 unsigned char udp_hdr[UDP_HEADER_SIZE];
2776
2777 // We only send one UDP packet per call (ugh), so we can only operate on a
2778 // single response at a time.
2779 resp = c->resp_head;
2780
2781 if (!resp) {
2782 return TRANSMIT_COMPLETE;
2783 }
2784
2785 if (resp->skip) {
2786 resp = resp_finish(c, resp);
2787 return TRANSMIT_INCOMPLETE;
2788 }
2789
2790 // clear the message and initialize it.
2791 memset(&msg, 0, sizeof(struct msghdr));
2792 msg.msg_iov = iovs;
2793
2794 // the UDP source to return to.
2795 msg.msg_name = &resp->request_addr;
2796 msg.msg_namelen = resp->request_addr_size;
2797
2798 // First IOV is the custom UDP header.
2799 iovs[0].iov_base = (void *)udp_hdr;
2800 iovs[0].iov_len = UDP_HEADER_SIZE;
2801 build_udp_header(udp_hdr, resp);
2802 iovused++;
2803
2804 // Fill the IOV's the standard way.
2805 // TODO: might get a small speedup if we let it break early with a length
2806 // limit.
2807 iovused = _transmit_pre(c, iovs, iovused, TRANSMIT_ONE_RESP);
2808
2809 // Clip the IOV's to the max UDP packet size.
2810 // If we add support for send_mmsg, this can be where we split msg's.
2811 {
2812 int x = 0;
2813 int len = 0;
2814 for (x = 0; x < iovused; x++) {
2815 if (len + iovs[x].iov_len >= UDP_MAX_PAYLOAD_SIZE) {
2816 iovs[x].iov_len = UDP_MAX_PAYLOAD_SIZE - len;
2817 x++;
2818 break;
2819 } else {
2820 len += iovs[x].iov_len;
2821 }
2822 }
2823 iovused = x;
2824 }
2825
2826 ssize_t res;
2827 msg.msg_iovlen = iovused;
2828 // NOTE: uses system sendmsg since we have no support for indirect UDP.
2829 res = sendmsg(c->sfd, &msg, 0);
2830 if (res >= 0) {
2831 pthread_mutex_lock(&c->thread->stats.mutex);
2832 c->thread->stats.bytes_written += res;
2833 pthread_mutex_unlock(&c->thread->stats.mutex);
2834
2835 // Ignore the header size from forwarding the IOV's
2836 res -= UDP_HEADER_SIZE;
2837
2838 // Decrement any partial IOV's and complete any finished resp's.
2839 _transmit_post(c, res);
2840
2841 if (c->resp_head) {
2842 return TRANSMIT_INCOMPLETE;
2843 } else {
2844 return TRANSMIT_COMPLETE;
2845 }
2846 }
2847
2848 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
2849 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
2850 if (settings.verbose > 0)
2851 fprintf(stderr, "Couldn't update event\n");
2852 conn_set_state(c, conn_closing);
2853 return TRANSMIT_HARD_ERROR;
2854 }
2855 return TRANSMIT_SOFT_ERROR;
2856 }
2857 /* if res == -1 and error is not EAGAIN or EWOULDBLOCK,
2858 we have a real error, on which we close the connection */
2859 if (settings.verbose > 0)
2860 perror("Failed to write, and not due to blocking");
2861
2862 conn_set_state(c, conn_read);
2863 return TRANSMIT_HARD_ERROR;
2864 }
2865
2866
2867 /* Does a looped read to fill data chunks */
2868 /* TODO: restrict number of times this can loop.
2869 * Also, benchmark using readv's.
2870 */
2871 static int read_into_chunked_item(conn *c) {
2872 int total = 0;
2873 int res;
2874 assert(c->rcurr != c->ritem);
2875
2876 while (c->rlbytes > 0) {
2877 item_chunk *ch = (item_chunk *)c->ritem;
2878 if (ch->size == ch->used) {
2879 // FIXME: ch->next is currently always 0. remove this?
2880 if (ch->next) {
2881 c->ritem = (char *) ch->next;
2882 } else {
2883 /* Allocate next chunk. Binary protocol needs 2b for \r\n */
2884 c->ritem = (char *) do_item_alloc_chunk(ch, c->rlbytes +
2885 ((c->protocol == binary_prot) ? 2 : 0));
2886 if (!c->ritem) {
2887 // We failed an allocation. Let caller handle cleanup.
2888 total = -2;
2889 break;
2890 }
2891 // ritem has new chunk, restart the loop.
2892 continue;
2893 //assert(c->rlbytes == 0);
2894 }
2895 }
2896
2897 int unused = ch->size - ch->used;
2898 /* first check if we have leftovers in the conn_read buffer */
2899 if (c->rbytes > 0) {
2900 total = 0;
2901 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
2902 tocopy = tocopy > unused ? unused : tocopy;
2903 if (c->ritem != c->rcurr) {
2904 memmove(ch->data + ch->used, c->rcurr, tocopy);
2905 }
2906 total += tocopy;
2907 c->rlbytes -= tocopy;
2908 c->rcurr += tocopy;
2909 c->rbytes -= tocopy;
2910 ch->used += tocopy;
2911 if (c->rlbytes == 0) {
2912 break;
2913 }
2914 } else {
2915 /* now try reading from the socket */
2916 res = c->read(c, ch->data + ch->used,
2917 (unused > c->rlbytes ? c->rlbytes : unused));
2918 if (res > 0) {
2919 pthread_mutex_lock(&c->thread->stats.mutex);
2920 c->thread->stats.bytes_read += res;
2921 pthread_mutex_unlock(&c->thread->stats.mutex);
2922 ch->used += res;
2923 total += res;
2924 c->rlbytes -= res;
2925 } else {
2926 /* Reset total to the latest result so caller can handle it */
2927 total = res;
2928 break;
2929 }
2930 }
2931 }
2932
2933 /* At some point I will be able to ditch the \r\n from item storage and
2934 remove all of these kludges.
2935 The above binprot check ensures inline space for \r\n, but if we do
2936 exactly enough allocs there will be no additional chunk for \r\n.
2937 */
2938 if (c->rlbytes == 0 && c->protocol == binary_prot && total >= 0) {
2939 item_chunk *ch = (item_chunk *)c->ritem;
2940 if (ch->size - ch->used < 2) {
2941 c->ritem = (char *) do_item_alloc_chunk(ch, 2);
2942 if (!c->ritem) {
2943 total = -2;
2944 }
2945 }
2946 }
2947 return total;
2948 }
2949
2950 static void drive_machine(conn *c) {
2951 bool stop = false;
2952 int sfd;
2953 socklen_t addrlen;
2954 struct sockaddr_storage addr;
2955 int nreqs = settings.reqs_per_event;
2956 int res;
2957 const char *str;
2958 #ifdef HAVE_ACCEPT4
2959 static int use_accept4 = 1;
2960 #else
2961 static int use_accept4 = 0;
2962 #endif
2963
2964 assert(c != NULL);
2965
2966 while (!stop) {
2967
2968 switch(c->state) {
2969 case conn_listening:
2970 addrlen = sizeof(addr);
2971 #ifdef HAVE_ACCEPT4
2972 if (use_accept4) {
2973 sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
2974 } else {
2975 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
2976 }
2977 #else
2978 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
2979 #endif
2980 if (sfd == -1) {
2981 if (use_accept4 && errno == ENOSYS) {
2982 use_accept4 = 0;
2983 continue;
2984 }
2985 perror(use_accept4 ? "accept4()" : "accept()");
2986 if (errno == EAGAIN || errno == EWOULDBLOCK) {
2987 /* these are transient, so don't log anything */
2988 stop = true;
2989 } else if (errno == EMFILE) {
2990 if (settings.verbose > 0)
2991 fprintf(stderr, "Too many open connections\n");
2992 accept_new_conns(false);
2993 stop = true;
2994 } else {
2995 perror("accept()");
2996 stop = true;
2997 }
2998 break;
2999 }
3000 if (!use_accept4) {
3001 if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
3002 perror("setting O_NONBLOCK");
3003 close(sfd);
3004 break;
3005 }
3006 }
3007
3008 bool reject;
3009 if (settings.maxconns_fast) {
3010 reject = sfd >= settings.maxconns - 1;
3011 if (reject) {
3012 STATS_LOCK();
3013 stats.rejected_conns++;
3014 STATS_UNLOCK();
3015 }
3016 } else {
3017 reject = false;
3018 }
3019
3020 if (reject) {
3021 str = "ERROR Too many open connections\r\n";
3022 res = write(sfd, str, strlen(str));
3023 close(sfd);
3024 } else {
3025 void *ssl_v = NULL;
3026 #ifdef TLS
3027 SSL *ssl = NULL;
3028 if (c->ssl_enabled) {
3029 assert(IS_TCP(c->transport) && settings.ssl_enabled);
3030
3031 if (settings.ssl_ctx == NULL) {
3032 if (settings.verbose) {
3033 fprintf(stderr, "SSL context is not initialized\n");
3034 }
3035 close(sfd);
3036 break;
3037 }
3038 SSL_LOCK();
3039 ssl = SSL_new(settings.ssl_ctx);
3040 SSL_UNLOCK();
3041 if (ssl == NULL) {
3042 if (settings.verbose) {
3043 fprintf(stderr, "Failed to created the SSL object\n");
3044 }
3045 close(sfd);
3046 break;
3047 }
3048 SSL_set_fd(ssl, sfd);
3049 int ret = SSL_accept(ssl);
3050 if (ret <= 0) {
3051 int err = SSL_get_error(ssl, ret);
3052 if (err == SSL_ERROR_SYSCALL || err == SSL_ERROR_SSL) {
3053 if (settings.verbose) {
3054 fprintf(stderr, "SSL connection failed with error code : %d : %s\n", err, strerror(errno));
3055 }
3056 SSL_free(ssl);
3057 close(sfd);
3058 STATS_LOCK();
3059 stats.ssl_handshake_errors++;
3060 STATS_UNLOCK();
3061 break;
3062 }
3063 }
3064 }
3065 ssl_v = (void*) ssl;
3066 #endif
3067
3068 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
3069 READ_BUFFER_CACHED, c->transport, ssl_v);
3070 }
3071
3072 stop = true;
3073 break;
3074
3075 case conn_waiting:
3076 rbuf_release(c);
3077 if (!update_event(c, EV_READ | EV_PERSIST)) {
3078 if (settings.verbose > 0)
3079 fprintf(stderr, "Couldn't update event\n");
3080 conn_set_state(c, conn_closing);
3081 break;
3082 }
3083
3084 conn_set_state(c, conn_read);
3085 stop = true;
3086 break;
3087
3088 case conn_read:
3089 if (!IS_UDP(c->transport)) {
3090 // Assign a read buffer if necessary.
3091 if (!rbuf_alloc(c)) {
3092 // TODO: Some way to allow for temporary failures.
3093 conn_set_state(c, conn_closing);
3094 break;
3095 }
3096 res = try_read_network(c);
3097 } else {
3098 // UDP connections always have a static buffer.
3099 res = try_read_udp(c);
3100 }
3101
3102 switch (res) {
3103 case READ_NO_DATA_RECEIVED:
3104 conn_set_state(c, conn_waiting);
3105 break;
3106 case READ_DATA_RECEIVED:
3107 conn_set_state(c, conn_parse_cmd);
3108 break;
3109 case READ_ERROR:
3110 conn_set_state(c, conn_closing);
3111 break;
3112 case READ_MEMORY_ERROR: /* Failed to allocate more memory */
3113 /* State already set by try_read_network */
3114 break;
3115 }
3116 break;
3117
3118 case conn_parse_cmd:
3119 c->noreply = false;
3120 if (c->try_read_command(c) == 0) {
3121 /* we need more data! */
3122 if (c->resp_head) {
3123 // Buffered responses waiting, flush in the meantime.
3124 conn_set_state(c, conn_mwrite);
3125 } else {
3126 conn_set_state(c, conn_waiting);
3127 }
3128 }
3129
3130 break;
3131
3132 case conn_new_cmd:
3133 /* Only process nreqs at a time to avoid starving other
3134 connections */
3135
3136 --nreqs;
3137 if (nreqs >= 0) {
3138 reset_cmd_handler(c);
3139 } else if (c->resp_head) {
3140 // flush response pipe on yield.
3141 conn_set_state(c, conn_mwrite);
3142 } else {
3143 pthread_mutex_lock(&c->thread->stats.mutex);
3144 c->thread->stats.conn_yields++;
3145 pthread_mutex_unlock(&c->thread->stats.mutex);
3146 if (c->rbytes > 0) {
3147 /* We have already read in data into the input buffer,
3148 so libevent will most likely not signal read events
3149 on the socket (unless more data is available. As a
3150 hack we should just put in a request to write data,
3151 because that should be possible ;-)
3152 */
3153 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3154 if (settings.verbose > 0)
3155 fprintf(stderr, "Couldn't update event\n");
3156 conn_set_state(c, conn_closing);
3157 break;
3158 }
3159 }
3160 stop = true;
3161 }
3162 break;
3163
3164 case conn_nread:
3165 if (c->rlbytes == 0) {
3166 complete_nread(c);
3167 break;
3168 }
3169
3170 /* Check if rbytes < 0, to prevent crash */
3171 if (c->rlbytes < 0) {
3172 if (settings.verbose) {
3173 fprintf(stderr, "Invalid rlbytes to read: len %d\n", c->rlbytes);
3174 }
3175 conn_set_state(c, conn_closing);
3176 break;
3177 }
3178
3179 if (c->item_malloced || ((((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) ) {
3180 /* first check if we have leftovers in the conn_read buffer */
3181 if (c->rbytes > 0) {
3182 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
3183 memmove(c->ritem, c->rcurr, tocopy);
3184 c->ritem += tocopy;
3185 c->rlbytes -= tocopy;
3186 c->rcurr += tocopy;
3187 c->rbytes -= tocopy;
3188 if (c->rlbytes == 0) {
3189 break;
3190 }
3191 }
3192
3193 /* now try reading from the socket */
3194 res = c->read(c, c->ritem, c->rlbytes);
3195 if (res > 0) {
3196 pthread_mutex_lock(&c->thread->stats.mutex);
3197 c->thread->stats.bytes_read += res;
3198 pthread_mutex_unlock(&c->thread->stats.mutex);
3199 if (c->rcurr == c->ritem) {
3200 c->rcurr += res;
3201 }
3202 c->ritem += res;
3203 c->rlbytes -= res;
3204 break;
3205 }
3206 } else {
3207 res = read_into_chunked_item(c);
3208 if (res > 0)
3209 break;
3210 }
3211
3212 if (res == 0) { /* end of stream */
3213 c->close_reason = NORMAL_CLOSE;
3214 conn_set_state(c, conn_closing);
3215 break;
3216 }
3217
3218 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3219 if (!update_event(c, EV_READ | EV_PERSIST)) {
3220 if (settings.verbose > 0)
3221 fprintf(stderr, "Couldn't update event\n");
3222 conn_set_state(c, conn_closing);
3223 break;
3224 }
3225 stop = true;
3226 break;
3227 }
3228
3229 /* Memory allocation failure */
3230 if (res == -2) {
3231 out_of_memory(c, "SERVER_ERROR Out of memory during read");
3232 c->sbytes = c->rlbytes;
3233 conn_set_state(c, conn_swallow);
3234 // Ensure this flag gets cleared. It gets killed on conn_new()
3235 // so any conn_closing is fine, calling complete_nread is
3236 // fine. This swallow semms to be the only other case.
3237 c->set_stale = false;
3238 c->mset_res = false;
3239 break;
3240 }
3241 /* otherwise we have a real error, on which we close the connection */
3242 if (settings.verbose > 0) {
3243 fprintf(stderr, "Failed to read, and not due to blocking:\n"
3244 "errno: %d %s \n"
3245 "rcurr=%p ritem=%p rbuf=%p rlbytes=%d rsize=%d\n",
3246 errno, strerror(errno),
3247 (void *)c->rcurr, (void *)c->ritem, (void *)c->rbuf,
3248 (int)c->rlbytes, (int)c->rsize);
3249 }
3250 conn_set_state(c, conn_closing);
3251 break;
3252
3253 case conn_swallow:
3254 /* we are reading sbytes and throwing them away */
3255 if (c->sbytes <= 0) {
3256 conn_set_state(c, conn_new_cmd);
3257 break;
3258 }
3259
3260 /* first check if we have leftovers in the conn_read buffer */
3261 if (c->rbytes > 0) {
3262 int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
3263 c->sbytes -= tocopy;
3264 c->rcurr += tocopy;
3265 c->rbytes -= tocopy;
3266 break;
3267 }
3268
3269 /* now try reading from the socket */
3270 res = c->read(c, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
3271 if (res > 0) {
3272 pthread_mutex_lock(&c->thread->stats.mutex);
3273 c->thread->stats.bytes_read += res;
3274 pthread_mutex_unlock(&c->thread->stats.mutex);
3275 c->sbytes -= res;
3276 break;
3277 }
3278 if (res == 0) { /* end of stream */
3279 c->close_reason = NORMAL_CLOSE;
3280 conn_set_state(c, conn_closing);
3281 break;
3282 }
3283 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3284 if (!update_event(c, EV_READ | EV_PERSIST)) {
3285 if (settings.verbose > 0)
3286 fprintf(stderr, "Couldn't update event\n");
3287 conn_set_state(c, conn_closing);
3288 break;
3289 }
3290 stop = true;
3291 break;
3292 }
3293 /* otherwise we have a real error, on which we close the connection */
3294 if (settings.verbose > 0)
3295 fprintf(stderr, "Failed to read, and not due to blocking\n");
3296 conn_set_state(c, conn_closing);
3297 break;
3298
3299 case conn_write:
3300 case conn_mwrite:
3301 /* have side IO's that must process before transmit() can run.
3302 * remove the connection from the worker thread and dispatch the
3303 * IO queue
3304 */
3305 assert(c->io_queues_submitted == 0);
3306
3307 for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
3308 if (q->stack_ctx != NULL) {
3309 io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);
3310 qcb->submit_cb(q);
3311 c->io_queues_submitted++;
3312 }
3313 }
3314 if (c->io_queues_submitted != 0) {
3315 conn_set_state(c, conn_io_queue);
3316 event_del(&c->event);
3317
3318 stop = true;
3319 break;
3320 }
3321
3322 switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) {
3323 case TRANSMIT_COMPLETE:
3324 if (c->state == conn_mwrite) {
3325 // Free up IO wraps and any half-uploaded items.
3326 conn_release_items(c);
3327 conn_set_state(c, conn_new_cmd);
3328 if (c->close_after_write) {
3329 conn_set_state(c, conn_closing);
3330 }
3331 } else {
3332 if (settings.verbose > 0)
3333 fprintf(stderr, "Unexpected state %d\n", c->state);
3334 conn_set_state(c, conn_closing);
3335 }
3336 break;
3337
3338 case TRANSMIT_INCOMPLETE:
3339 case TRANSMIT_HARD_ERROR:
3340 break; /* Continue in state machine. */
3341
3342 case TRANSMIT_SOFT_ERROR:
3343 stop = true;
3344 break;
3345 }
3346 break;
3347
3348 case conn_closing:
3349 if (IS_UDP(c->transport))
3350 conn_cleanup(c);
3351 else
3352 conn_close(c);
3353 stop = true;
3354 break;
3355
3356 case conn_closed:
3357 /* This only happens if dormando is an idiot. */
3358 abort();
3359 break;
3360
3361 case conn_watch:
3362 /* We handed off our connection to the logger thread. */
3363 stop = true;
3364 break;
3365 case conn_io_queue:
3366 /* Complete our queued IO's from within the worker thread. */
3367 conn_io_queue_complete(c);
3368 conn_set_state(c, conn_mwrite);
3369 break;
3370 case conn_max_state:
3371 assert(false);
3372 break;
3373 }
3374 }
3375
3376 return;
3377 }
3378
3379 void event_handler(const evutil_socket_t fd, const short which, void *arg) {
3380 conn *c;
3381
3382 c = (conn *)arg;
3383 assert(c != NULL);
3384
3385 c->which = which;
3386
3387 /* sanity */
3388 if (fd != c->sfd) {
3389 if (settings.verbose > 0)
3390 fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
3391 conn_close(c);
3392 return;
3393 }
3394
3395 drive_machine(c);
3396
3397 /* wait for next event */
3398 return;
3399 }
3400
3401 static int new_socket(struct addrinfo *ai) {
3402 int sfd;
3403 int flags;
3404
3405 if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
3406 return -1;
3407 }
3408
3409 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
3410 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
3411 perror("setting O_NONBLOCK");
3412 close(sfd);
3413 return -1;
3414 }
3415 return sfd;
3416 }
3417
3418
3419 /*
3420 * Sets a socket's send buffer size to the maximum allowed by the system.
3421 */
3422 static void maximize_sndbuf(const int sfd) {
3423 socklen_t intsize = sizeof(int);
3424 int last_good = 0;
3425 int min, max, avg;
3426 int old_size;
3427
3428 /* Start with the default size. */
3429 #ifdef _WIN32
3430 if (getsockopt((SOCKET)sfd, SOL_SOCKET, SO_SNDBUF, (char *)&old_size, &intsize) != 0) {
3431 #else
3432 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
3433 #endif /* #ifdef _WIN32 */
3434 if (settings.verbose > 0)
3435 perror("getsockopt(SO_SNDBUF)");
3436 return;
3437 }
3438
3439 /* Binary-search for the real maximum. */
3440 min = old_size;
3441 max = MAX_SENDBUF_SIZE;
3442
3443 while (min <= max) {
3444 avg = ((unsigned int)(min + max)) / 2;
3445 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
3446 last_good = avg;
3447 min = avg + 1;
3448 } else {
3449 max = avg - 1;
3450 }
3451 }
3452
3453 if (settings.verbose > 1)
3454 fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
3455 }
3456
3457 /**
3458 * Create a socket and bind it to a specific port number
3459 * @param interface the interface to bind to
3460 * @param port the port number to bind to
3461 * @param transport the transport protocol (TCP / UDP)
3462 * @param portnumber_file A filepointer to write the port numbers to
3463 * when they are successfully added to the list of ports we
3464 * listen on.
3465 */
3466 static int server_socket(const char *interface,
3467 int port,
3468 enum network_transport transport,
3469 FILE *portnumber_file, bool ssl_enabled) {
3470 int sfd;
3471 struct linger ling = {0, 0};
3472 struct addrinfo *ai;
3473 struct addrinfo *next;
3474 struct addrinfo hints = { .ai_flags = AI_PASSIVE,
3475 .ai_family = AF_UNSPEC };
3476 char port_buf[NI_MAXSERV];
3477 int error;
3478 int success = 0;
3479 int flags =1;
3480
3481 hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
3482
3483 if (port == -1) {
3484 port = 0;
3485 }
3486 snprintf(port_buf, sizeof(port_buf), "%d", port);
3487 error= getaddrinfo(interface, port_buf, &hints, &ai);
3488 if (error != 0) {
3489 if (error != EAI_SYSTEM)
3490 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
3491 else
3492 perror("getaddrinfo()");
3493 return 1;
3494 }
3495
3496 for (next= ai; next; next= next->ai_next) {
3497 conn *listen_conn_add;
3498 if ((sfd = new_socket(next)) == -1) {
3499 /* getaddrinfo can return "junk" addresses,
3500 * we make sure at least one works before erroring.
3501 */
3502 if (errno == EMFILE) {
3503 /* ...unless we're out of fds */
3504 perror("server_socket");
3505 exit(EX_OSERR);
3506 }
3507 continue;
3508 }
3509
3510 if (settings.num_napi_ids) {
3511 socklen_t len = sizeof(socklen_t);
3512 int napi_id;
3513 error = getsockopt(sfd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len);
3514 if (error != 0) {
3515 fprintf(stderr, "-N <num_napi_ids> option not supported\n");
3516 exit(EXIT_FAILURE);
3517 }
3518 }
3519
3520 #ifdef IPV6_V6ONLY
3521 if (next->ai_family == AF_INET6) {
3522 error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
3523 if (error != 0) {
3524 perror("setsockopt");
3525 close(sfd);
3526 continue;
3527 }
3528 }
3529 #endif
3530
3531 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
3532 if (IS_UDP(transport)) {
3533 maximize_sndbuf(sfd);
3534 } else {
3535 error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
3536 if (error != 0)
3537 perror("setsockopt");
3538
3539 error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
3540 if (error != 0)
3541 perror("setsockopt");
3542
3543 error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
3544 if (error != 0)
3545 perror("setsockopt");
3546 }
3547
3548 if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
3549 if (errno != EADDRINUSE) {
3550 perror("bind()");
3551 close(sfd);
3552 freeaddrinfo(ai);
3553 return 1;
3554 }
3555 close(sfd);
3556 continue;
3557 } else {
3558 success++;
3559 if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
3560 perror("listen()");
3561 close(sfd);
3562 freeaddrinfo(ai);
3563 return 1;
3564 }
3565 if (portnumber_file != NULL &&
3566 (next->ai_addr->sa_family == AF_INET ||
3567 next->ai_addr->sa_family == AF_INET6)) {
3568 union {
3569 struct sockaddr_in in;
3570 struct sockaddr_in6 in6;
3571 } my_sockaddr;
3572 socklen_t len = sizeof(my_sockaddr);
3573 if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
3574 if (next->ai_addr->sa_family == AF_INET) {
3575 fprintf(portnumber_file, "%s INET: %u\n",
3576 IS_UDP(transport) ? "UDP" : "TCP",
3577 ntohs(my_sockaddr.in.sin_port));
3578 } else {
3579 fprintf(portnumber_file, "%s INET6: %u\n",
3580 IS_UDP(transport) ? "UDP" : "TCP",
3581 ntohs(my_sockaddr.in6.sin6_port));
3582 }
3583 }
3584 }
3585 }
3586
3587 if (IS_UDP(transport)) {
3588 int c;
3589
3590 for (c = 0; c < settings.num_threads_per_udp; c++) {
3591 /* Allocate one UDP file descriptor per worker thread;
3592 * this allows "stats conns" to separately list multiple
3593 * parallel UDP requests in progress.
3594 *
3595 * The dispatch code round-robins new connection requests
3596 * among threads, so this is guaranteed to assign one
3597 * FD to each thread.
3598 */
3599 int per_thread_fd;
3600 if (c == 0) {
3601 per_thread_fd = sfd;
3602 } else {
3603 per_thread_fd = dup(sfd);
3604 if (per_thread_fd < 0) {
3605 perror("Failed to duplicate file descriptor");
3606 exit(EXIT_FAILURE);
3607 }
3608 }
3609 dispatch_conn_new(per_thread_fd, conn_read,
3610 EV_READ | EV_PERSIST,
3611 UDP_READ_BUFFER_SIZE, transport, NULL);
3612 }
3613 } else {
3614 if (!(listen_conn_add = conn_new(sfd, conn_listening,
3615 EV_READ | EV_PERSIST, 1,
3616 transport, main_base, NULL))) {
3617 fprintf(stderr, "failed to create listening connection\n");
3618 exit(EXIT_FAILURE);
3619 }
3620 #ifdef TLS
3621 listen_conn_add->ssl_enabled = ssl_enabled;
3622 #else
3623 assert(ssl_enabled == false);
3624 #endif
3625 listen_conn_add->next = listen_conn;
3626 listen_conn = listen_conn_add;
3627 }
3628 }
3629
3630 freeaddrinfo(ai);
3631
3632 /* Return zero iff we detected no errors in starting up connections */
3633 return success == 0;
3634 }
3635
3636 static int server_sockets(int port, enum network_transport transport,
3637 FILE *portnumber_file) {
3638 bool ssl_enabled = false;
3639
3640 #ifdef TLS
3641 const char *notls = "notls";
3642 ssl_enabled = settings.ssl_enabled;
3643 #endif
3644
3645 if (settings.inter == NULL) {
3646 return server_socket(settings.inter, port, transport, portnumber_file, ssl_enabled);
3647 } else {
3648 // tokenize them and bind to each one of them..
3649 char *b;
3650 int ret = 0;
3651 char *list = strdup(settings.inter);
3652
3653 if (list == NULL) {
3654 fprintf(stderr, "Failed to allocate memory for parsing server interface string\n");
3655 return 1;
3656 }
3657 // If we encounter any failure, preserve the first errno for the caller.
3658 int errno_save = 0;
3659 for (char *p = strtok_r(list, ";,", &b);
3660 p != NULL;
3661 p = strtok_r(NULL, ";,", &b)) {
3662 int the_port = port;
3663 #ifdef TLS
3664 ssl_enabled = settings.ssl_enabled;
3665 // "notls" option is valid only when memcached is run with SSL enabled.
3666 if (strncmp(p, notls, strlen(notls)) == 0) {
3667 if (!settings.ssl_enabled) {
3668 fprintf(stderr, "'notls' option is valid only when SSL is enabled\n");
3669 free(list);
3670 return 1;
3671 }
3672 ssl_enabled = false;
3673 p += strlen(notls) + 1;
3674 }
3675 #endif
3676
3677 char *h = NULL;
3678 if (*p == '[') {
3679 // expecting it to be an IPv6 address enclosed in []
3680 // i.e. RFC3986 style recommended by RFC5952
3681 char *e = strchr(p, ']');
3682 if (e == NULL) {
3683 fprintf(stderr, "Invalid IPV6 address: \"%s\"", p);
3684 free(list);
3685 return 1;
3686 }
3687 h = ++p; // skip the opening '['
3688 *e = '\0';
3689 p = ++e; // skip the closing ']'
3690 }
3691
3692 char *s = strchr(p, ':');
3693 if (s != NULL) {
3694 // If no more semicolons - attempt to treat as port number.
3695 // Otherwise the only valid option is an unenclosed IPv6 without port, until
3696 // of course there was an RFC3986 IPv6 address previously specified -
3697 // in such a case there is no good option, will just send it to fail as port number.
3698 if (strchr(s + 1, ':') == NULL || h != NULL) {
3699 *s = '\0';
3700 ++s;
3701 if (!safe_strtol(s, &the_port)) {
3702 fprintf(stderr, "Invalid port number: \"%s\"", s);
3703 free(list);
3704 return 1;
3705 }
3706 }
3707 }
3708
3709 if (h != NULL)
3710 p = h;
3711
3712 if (strcmp(p, "*") == 0) {
3713 p = NULL;
3714 }
3715 ret |= server_socket(p, the_port, transport, portnumber_file, ssl_enabled);
3716 if (ret != 0 && errno_save == 0) errno_save = errno;
3717 }
3718 free(list);
3719 errno = errno_save;
3720 return ret;
3721 }
3722 }
3723
3724 #ifndef DISABLE_UNIX_SOCKET
3725 static int new_socket_unix(void) {
3726 int sfd;
3727 int flags;
3728
3729 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
3730 perror("socket()");
3731 return -1;
3732 }
3733
3734 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
3735 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
3736 perror("setting O_NONBLOCK");
3737 close(sfd);
3738 return -1;
3739 }
3740 return sfd;
3741 }
3742
3743 static int server_socket_unix(const char *path, int access_mask) {
3744 int sfd;
3745 struct linger ling = {0, 0};
3746 struct sockaddr_un addr;
3747 struct stat tstat;
3748 int flags =1;
3749 int old_umask;
3750
3751 if (!path) {
3752 return 1;
3753 }
3754
3755 if ((sfd = new_socket_unix()) == -1) {
3756 return 1;
3757 }
3758
3759 /*
3760 * Clean up a previous socket file if we left it around
3761 */
3762 if (lstat(path, &tstat) == 0) {
3763 if (S_ISSOCK(tstat.st_mode))
3764 unlink(path);
3765 }
3766
3767 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
3768 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
3769 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
3770
3771 /*
3772 * the memset call clears nonstandard fields in some implementations
3773 * that otherwise mess things up.
3774 */
3775 memset(&addr, 0, sizeof(addr));
3776
3777 addr.sun_family = AF_UNIX;
3778 strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
3779 assert(strcmp(addr.sun_path, path) == 0);
3780 old_umask = umask( ~(access_mask&0777));
3781 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
3782 perror("bind()");
3783 close(sfd);
3784 umask(old_umask);
3785 return 1;
3786 }
3787 umask(old_umask);
3788 if (listen(sfd, settings.backlog) == -1) {
3789 perror("listen()");
3790 close(sfd);
3791 return 1;
3792 }
3793 if (!(listen_conn = conn_new(sfd, conn_listening,
3794 EV_READ | EV_PERSIST, 1,
3795 local_transport, main_base, NULL))) {
3796 fprintf(stderr, "failed to create listening connection\n");
3797 exit(EXIT_FAILURE);
3798 }
3799
3800 return 0;
3801 }
3802 #else
3803 #define server_socket_unix(path, access_mask) -1
3804 #endif /* #ifndef DISABLE_UNIX_SOCKET */
3805
3806 /*
3807 * We keep the current time of day in a global variable that's updated by a
3808 * timer event. This saves us a bunch of time() system calls (we really only
3809 * need to get the time once a second, whereas there can be tens of thousands
3810 * of requests a second) and allows us to use server-start-relative timestamps
3811 * rather than absolute UNIX timestamps, a space savings on systems where
3812 * sizeof(time_t) > sizeof(unsigned int).
3813 */
3814 volatile rel_time_t current_time;
3815 static struct event clockevent;
3816 #ifdef MEMCACHED_DEBUG
3817 volatile bool is_paused;
3818 volatile int64_t delta;
3819 #endif
3820 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
3821 static bool monotonic = false;
3822 static int64_t monotonic_start;
3823 #endif
3824
3825 /* libevent uses a monotonic clock when available for event scheduling. Aside
3826 * from jitter, simply ticking our internal timer here is accurate enough.
3827 * Note that users who are setting explicit dates for expiration times *must*
3828 * ensure their clocks are correct before starting memcached. */
3829 static void clock_handler(const evutil_socket_t fd, const short which, void *arg) {
3830 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
3831 static bool initialized = false;
3832
3833 if (initialized) {
3834 /* only delete the event if it's actually there. */
3835 evtimer_del(&clockevent);
3836 } else {
3837 initialized = true;
3838 }
3839
3840 // While we're here, check for hash table expansion.
3841 // This function should be quick to avoid delaying the timer.
3842 assoc_start_expand(stats_state.curr_items);
3843 // also, if HUP'ed we need to do some maintenance.
3844 // for now that's just the authfile reload.
3845 if (settings.sig_hup) {
3846 settings.sig_hup = false;
3847
3848 authfile_load(settings.auth_file);
3849 #ifdef PROXY
3850 if (settings.proxy_ctx) {
3851 proxy_start_reload(settings.proxy_ctx);
3852 }
3853 #endif
3854 }
3855
3856 evtimer_set(&clockevent, clock_handler, 0);
3857 event_base_set(main_base, &clockevent);
3858 evtimer_add(&clockevent, &t);
3859
3860 #ifdef MEMCACHED_DEBUG
3861 if (is_paused) return;
3862 #endif
3863
3864 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
3865 if (monotonic) {
3866 struct timespec ts;
3867 if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1)
3868 return;
3869 #ifdef MEMCACHED_DEBUG
3870 current_time = (rel_time_t) (ts.tv_sec - monotonic_start + delta);
3871 #else
3872 current_time = (rel_time_t) (ts.tv_sec - monotonic_start);
3873 #endif
3874 return;
3875 }
3876 #endif
3877 {
3878 struct timeval tv;
3879 gettimeofday(&tv, NULL);
3880 #ifdef MEMCACHED_DEBUG
3881 current_time = (rel_time_t) (tv.tv_sec - process_started + delta);
3882 #else
3883 current_time = (rel_time_t) (tv.tv_sec - process_started);
3884 #endif
3885 }
3886 }
3887
3888 static const char* flag_enabled_disabled(bool flag) {
3889 return (flag ? "enabled" : "disabled");
3890 }
3891
3892 static void verify_default(const char* param, bool condition) {
3893 if (!condition) {
3894 printf("Default value of [%s] has changed."
3895 " Modify the help text and default value check.\n", param);
3896 exit(EXIT_FAILURE);
3897 }
3898 }
3899
3900 static void usage(void) {
3901 printf(PACKAGE " " VERSION "\n");
3902 printf("-p, --port=<num> TCP port to listen on (default: %d)\n"
3903 "-U, --udp-port=<num> UDP port to listen on (default: %d, off)\n",
3904 settings.port, settings.udpport);
3905 #ifndef DISABLE_UNIX_SOCKET
3906 printf("-s, --unix-socket=<file> UNIX socket to listen on (disables network support)\n");
3907 printf("-a, --unix-mask=<mask> access mask for UNIX socket, in octal (default: %o)\n",
3908 settings.access);
3909 #endif /* #ifndef DISABLE_UNIX_SOCKET */
3910 printf("-A, --enable-shutdown enable ascii \"shutdown\" command\n");
3911 printf("-l, --listen=<addr> interface to listen on (default: INADDR_ANY)\n");
3912 #ifdef TLS
3913 printf(" if TLS/SSL is enabled, 'notls' prefix can be used to\n"
3914 " disable for specific listeners (-l notls:<ip>:<port>) \n");
3915 #endif
3916 printf("-d, --daemon run as a daemon\n"
3917 "-r, --enable-coredumps maximize core file limit\n"
3918 "-u, --user=<user> assume identity of <username> (only when run as root)\n"
3919 "-m, --memory-limit=<num> item memory in megabytes (default: %lu)\n"
3920 "-M, --disable-evictions return error on memory exhausted instead of evicting\n"
3921 "-c, --conn-limit=<num> max simultaneous connections (default: %d)\n"
3922 "-k, --lock-memory lock down all paged memory\n"
3923 "-v, --verbose verbose (print errors/warnings while in event loop)\n"
3924 "-vv very verbose (also print client commands/responses)\n"
3925 "-vvv extremely verbose (internal state transitions)\n"
3926 "-h, --help print this help and exit\n"
3927 "-i, --license print memcached and libevent license\n"
3928 "-V, --version print version and exit\n"
3929 "-P, --pidfile=<file> save PID in <file>, only used with -d option\n"
3930 "-f, --slab-growth-factor=<num> chunk size growth factor (default: %2.2f)\n"
3931 "-n, --slab-min-size=<bytes> min space used for key+value+flags (default: %d)\n",
3932 (unsigned long) settings.maxbytes / (1 << 20),
3933 settings.maxconns, settings.factor, settings.chunk_size);
3934 verify_default("udp-port",settings.udpport == 0);
3935 printf("-L, --enable-largepages try to use large memory pages (if available)\n");
3936 printf("-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
3937 " This is used for per-prefix stats reporting. The default is\n"
3938 " \"%c\" (colon). If this option is specified, stats collection\n"
3939 " is turned on automatically; if not, then it may be turned on\n"
3940 " by sending the \"stats detail on\" command to the server.\n",
3941 settings.prefix_delimiter);
3942 printf("-t, --threads=<num> number of threads to use (default: %d)\n", settings.num_threads);
3943 printf("-R, --max-reqs-per-event maximum number of requests per event, limits the\n"
3944 " requests processed per connection to prevent \n"
3945 " starvation (default: %d)\n", settings.reqs_per_event);
3946 printf("-C, --disable-cas disable use of CAS\n");
3947 printf("-b, --listen-backlog=<num> set the backlog queue limit (default: %d)\n", settings.backlog);
3948 printf("-B, --protocol=<name> protocol - one of ascii, binary, or auto (default: %s)\n",
3949 prot_text(settings.binding_protocol));
3950 printf("-I, --max-item-size=<num> adjusts max item size\n"
3951 " (default: %dm, min: %dk, max: %dm)\n",
3952 settings.item_size_max/ (1 << 20), ITEM_SIZE_MAX_LOWER_LIMIT / (1 << 10), ITEM_SIZE_MAX_UPPER_LIMIT / (1 << 20));
3953 #ifdef ENABLE_SASL
3954 printf("-S, --enable-sasl turn on Sasl authentication\n");
3955 #endif
3956 printf("-F, --disable-flush-all disable flush_all command\n");
3957 printf("-X, --disable-dumping disable stats cachedump and lru_crawler metadump\n");
3958 printf("-W --disable-watch disable watch commands (live logging)\n");
3959 printf("-Y, --auth-file=<file> (EXPERIMENTAL) enable ASCII protocol authentication. format:\n"
3960 " user:pass\\nuser2:pass2\\n\n");
3961 printf("-e, --memory-file=<file> (EXPERIMENTAL) mmap a file for item memory.\n"
3962 " use only in ram disks or persistent memory mounts!\n"
3963 " enables restartable cache (stop with SIGUSR1)\n");
3964 #ifdef TLS
3965 printf("-Z, --enable-ssl enable TLS/SSL\n");
3966 #endif
3967 printf("-o, --extended comma separated list of extended options\n"
3968 " most options have a 'no_' prefix to disable\n"
3969 " - maxconns_fast: immediately close new connections after limit (default: %s)\n"
3970 " - hashpower: an integer multiplier for how large the hash\n"
3971 " table should be. normally grows at runtime. (default starts at: %d)\n"
3972 " set based on \"STAT hash_power_level\"\n"
3973 " - tail_repair_time: time in seconds for how long to wait before\n"
3974 " forcefully killing LRU tail item.\n"
3975 " disabled by default; very dangerous option.\n"
3976 " - hash_algorithm: the hash table algorithm\n"
3977 " default is murmur3 hash. options: jenkins, murmur3, xxh3\n"
3978 " - no_lru_crawler: disable LRU Crawler background thread.\n"
3979 " - lru_crawler_sleep: microseconds to sleep between items\n"
3980 " default is %d.\n"
3981 " - lru_crawler_tocrawl: max items to crawl per slab per run\n"
3982 " default is %u (unlimited)\n",
3983 flag_enabled_disabled(settings.maxconns_fast), settings.hashpower_init,
3984 settings.lru_crawler_sleep, settings.lru_crawler_tocrawl);
3985 printf(" - read_buf_mem_limit: limit in megabytes for connection read/response buffers.\n"
3986 " do not adjust unless you have high (20k+) conn. limits.\n"
3987 " 0 means unlimited (default: %u)\n",
3988 settings.read_buf_mem_limit);
3989 verify_default("read_buf_mem_limit", settings.read_buf_mem_limit == 0);
3990 printf(" - no_lru_maintainer: disable new LRU system + background thread.\n"
3991 " - hot_lru_pct: pct of slab memory to reserve for hot lru.\n"
3992 " (requires lru_maintainer, default pct: %d)\n"
3993 " - warm_lru_pct: pct of slab memory to reserve for warm lru.\n"
3994 " (requires lru_maintainer, default pct: %d)\n"
3995 " - hot_max_factor: items idle > cold lru age * drop from hot lru. (default: %.2f)\n"
3996 " - warm_max_factor: items idle > cold lru age * this drop from warm. (default: %.2f)\n"
3997 " - temporary_ttl: TTL's below get separate LRU, can't be evicted.\n"
3998 " (requires lru_maintainer, default: %d)\n"
3999 " - idle_timeout: timeout for idle connections. (default: %d, no timeout)\n",
4000 settings.hot_lru_pct, settings.warm_lru_pct, settings.hot_max_factor, settings.warm_max_factor,
4001 settings.temporary_ttl, settings.idle_timeout);
4002 printf(" - slab_chunk_max: (EXPERIMENTAL) maximum slab size in kilobytes. use extreme care. (default: %d)\n"
4003 " - watcher_logbuf_size: size in kilobytes of per-watcher write buffer. (default: %u)\n"
4004 " - worker_logbuf_size: size in kilobytes of per-worker-thread buffer\n"
4005 " read by background thread, then written to watchers. (default: %u)\n"
4006 " - track_sizes: enable dynamic reports for 'stats sizes' command.\n"
4007 " - no_hashexpand: disables hash table expansion (dangerous)\n"
4008 " - modern: enables options which will be default in future.\n"
4009 " currently: nothing\n"
4010 " - no_modern: uses defaults of previous major version (1.4.x)\n",
4011 settings.slab_chunk_size_max / (1 << 10), settings.logger_watcher_buf_size / (1 << 10),
4012 settings.logger_buf_size / (1 << 10));
4013 verify_default("tail_repair_time", settings.tail_repair_time == TAIL_REPAIR_TIME_DEFAULT);
4014 verify_default("lru_crawler_tocrawl", settings.lru_crawler_tocrawl == 0);
4015 verify_default("idle_timeout", settings.idle_timeout == 0);
4016 #ifdef HAVE_DROP_PRIVILEGES
4017 printf(" - drop_privileges: enable dropping extra syscall privileges\n"
4018 " - no_drop_privileges: disable drop_privileges in case it causes issues with\n"
4019 " some customisation.\n"
4020 " (default is no_drop_privileges)\n");
4021 verify_default("drop_privileges", !settings.drop_privileges);
4022 #ifdef MEMCACHED_DEBUG
4023 printf(" - relaxed_privileges: running tests requires extra privileges. (default: %s)\n",
4024 flag_enabled_disabled(settings.relaxed_privileges));
4025 #endif
4026 #endif
4027 #ifdef EXTSTORE
4028 printf("\n - External storage (ext_*) related options (see: https://memcached.org/extstore)\n");
4029 printf(" - ext_path: file to write to for external storage.\n"
4030 " ie: ext_path=/mnt/d1/extstore:1G\n"
4031 " - ext_page_size: size in megabytes of storage pages. (default: %u)\n"
4032 " - ext_wbuf_size: size in megabytes of page write buffers. (default: %u)\n"
4033 " - ext_threads: number of IO threads to run. (default: %u)\n"
4034 " - ext_item_size: store items larger than this (bytes, default %u)\n"
4035 " - ext_item_age: store items idle at least this long (seconds, default: no age limit)\n"
4036 " - ext_low_ttl: consider TTLs lower than this specially (default: %u)\n"
4037 " - ext_drop_unread: don't re-write unread values during compaction (default: %s)\n"
4038 " - ext_recache_rate: recache an item every N accesses (default: %u)\n"
4039 " - ext_compact_under: compact when fewer than this many free pages\n"
4040 " (default: 1/4th of the assigned storage)\n"
4041 " - ext_drop_under: drop COLD items when fewer than this many free pages\n"
4042 " (default: 1/4th of the assigned storage)\n"
4043 " - ext_max_frag: max page fragmentation to tolerate (default: %.2f)\n"
4044 " - ext_max_sleep: max sleep time of background threads in us (default: %u)\n"
4045 " - slab_automove_freeratio: ratio of memory to hold free as buffer.\n"
4046 " (see doc/storage.txt for more info, default: %.3f)\n",
4047 settings.ext_page_size / (1 << 20), settings.ext_wbuf_size / (1 << 20), settings.ext_io_threadcount,
4048 settings.ext_item_size, settings.ext_low_ttl,
4049 flag_enabled_disabled(settings.ext_drop_unread), settings.ext_recache_rate,
4050 settings.ext_max_frag, settings.ext_max_sleep, settings.slab_automove_freeratio);
4051 verify_default("ext_item_age", settings.ext_item_age == UINT_MAX);
4052 #endif
4053 #ifdef PROXY
4054 printf(" - proxy_config: path to lua config file.\n");
4055 #ifdef HAVE_LIBURING
4056 printf(" - proxy_uring: enable IO_URING for proxy backends.\n");
4057 #endif
4058 #endif
4059 #ifdef TLS
4060 printf(" - ssl_chain_cert: certificate chain file in PEM format\n"
4061 " - ssl_key: private key, if not part of the -ssl_chain_cert\n"
4062 " - ssl_keyformat: private key format (PEM, DER or ENGINE) (default: PEM)\n");
4063 printf(" - ssl_verify_mode: peer certificate verification mode, default is 0(None).\n"
4064 " valid values are 0(None), 1(Request), 2(Require)\n"
4065 " or 3(Once)\n");
4066 printf(" - ssl_ciphers: specify cipher list to be used\n"
4067 " - ssl_ca_cert: PEM format file of acceptable client CA's\n"
4068 " - ssl_wbuf_size: size in kilobytes of per-connection SSL output buffer\n"
4069 " (default: %u)\n", settings.ssl_wbuf_size / (1 << 10));
4070 printf(" - ssl_session_cache: enable server-side SSL session cache, to support session\n"
4071 " resumption\n"
4072 " - ssl_min_version: minimum protocol version to accept (default: %s)\n"
4073 #if defined(TLS1_3_VERSION)
4074 " valid values are 0(%s), 1(%s), 2(%s), or 3(%s).\n",
4075 ssl_proto_text(settings.ssl_min_version),
4076 ssl_proto_text(TLS1_VERSION), ssl_proto_text(TLS1_1_VERSION),
4077 ssl_proto_text(TLS1_2_VERSION), ssl_proto_text(TLS1_3_VERSION));
4078 #else
4079 " valid values are 0(%s), 1(%s), or 2(%s).\n",
4080 ssl_proto_text(settings.ssl_min_version),
4081 ssl_proto_text(TLS1_VERSION), ssl_proto_text(TLS1_1_VERSION),
4082 ssl_proto_text(TLS1_2_VERSION));
4083 #endif
4084 verify_default("ssl_keyformat", settings.ssl_keyformat == SSL_FILETYPE_PEM);
4085 verify_default("ssl_verify_mode", settings.ssl_verify_mode == SSL_VERIFY_NONE);
4086 verify_default("ssl_min_version", settings.ssl_min_version == TLS1_2_VERSION);
4087 #endif
4088 printf("-N, --napi_ids number of napi ids. see doc/napi_ids.txt for more details\n");
4089 return;
4090 }
4091
4092 static void usage_license(void) {
4093 printf(PACKAGE " " VERSION "\n\n");
4094 printf(
4095 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
4096 "All rights reserved.\n"
4097 "\n"
4098 "Redistribution and use in source and binary forms, with or without\n"
4099 "modification, are permitted provided that the following conditions are\n"
4100 "met:\n"
4101 "\n"
4102 " * Redistributions of source code must retain the above copyright\n"
4103 "notice, this list of conditions and the following disclaimer.\n"
4104 "\n"
4105 " * Redistributions in binary form must reproduce the above\n"
4106 "copyright notice, this list of conditions and the following disclaimer\n"
4107 "in the documentation and/or other materials provided with the\n"
4108 "distribution.\n"
4109 "\n"
4110 " * Neither the name of the Danga Interactive nor the names of its\n"
4111 "contributors may be used to endorse or promote products derived from\n"
4112 "this software without specific prior written permission.\n"
4113 "\n"
4114 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
4115 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
4116 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
4117 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
4118 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
4119 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
4120 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4121 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4122 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4123 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
4124 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4125 "\n"
4126 "\n"
4127 "This product includes software developed by Niels Provos.\n"
4128 "\n"
4129 "[ libevent ]\n"
4130 "\n"
4131 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
4132 "All rights reserved.\n"
4133 "\n"
4134 "Redistribution and use in source and binary forms, with or without\n"
4135 "modification, are permitted provided that the following conditions\n"
4136 "are met:\n"
4137 "1. Redistributions of source code must retain the above copyright\n"
4138 " notice, this list of conditions and the following disclaimer.\n"
4139 "2. Redistributions in binary form must reproduce the above copyright\n"
4140 " notice, this list of conditions and the following disclaimer in the\n"
4141 " documentation and/or other materials provided with the distribution.\n"
4142 "3. All advertising materials mentioning features or use of this software\n"
4143 " must display the following acknowledgement:\n"
4144 " This product includes software developed by Niels Provos.\n"
4145 "4. The name of the author may not be used to endorse or promote products\n"
4146 " derived from this software without specific prior written permission.\n"
4147 "\n"
4148 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
4149 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
4150 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
4151 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
4152 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
4153 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4154 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4155 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4156 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
4157 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4158 );
4159
4160 return;
4161 }
4162
4163 static void save_pid(const char *pid_file) {
4164 FILE *fp;
4165 if (access(pid_file, F_OK) == 0) {
4166 if ((fp = fopen(pid_file, "r")) != NULL) {
4167 char buffer[1024];
4168 if (fgets(buffer, sizeof(buffer), fp) != NULL) {
4169 unsigned int pid;
4170 if (safe_strtoul(buffer, &pid) && kill((pid_t)pid, 0) == 0) {
4171 fprintf(stderr, "WARNING: The pid file contained the following (running) pid: %u\n", pid);
4172 }
4173 }
4174 fclose(fp);
4175 }
4176 }
4177
4178 /* Create the pid file first with a temporary name, then
4179 * atomically move the file to the real name to avoid a race with
4180 * another process opening the file to read the pid, but finding
4181 * it empty.
4182 */
4183 char tmp_pid_file[1024];
4184 snprintf(tmp_pid_file, sizeof(tmp_pid_file), "%s.tmp", pid_file);
4185
4186 if ((fp = fopen(tmp_pid_file, "w")) == NULL) {
4187 vperror("Could not open the pid file %s for writing", tmp_pid_file);
4188 return;
4189 }
4190
4191 fprintf(fp,"%ld\n", (long)getpid());
4192 if (fclose(fp) == -1) {
4193 vperror("Could not close the pid file %s", tmp_pid_file);
4194 }
4195
4196 if (rename(tmp_pid_file, pid_file) != 0) {
4197 vperror("Could not rename the pid file from %s to %s",
4198 tmp_pid_file, pid_file);
4199 }
4200 }
4201
4202 static void remove_pidfile(const char *pid_file) {
4203 if (pid_file == NULL)
4204 return;
4205
4206 if (unlink(pid_file) != 0) {
4207 vperror("Could not remove the pid file %s", pid_file);
4208 }
4209
4210 }
4211
4212 static void sig_handler(const int sig) {
4213 stop_main_loop = EXIT_NORMALLY;
4214 printf("Signal handled: %s.\n", strsignal(sig));
4215 }
4216
4217 static void sighup_handler(const int sig) {
4218 settings.sig_hup = true;
4219 }
4220
4221 static void sig_usrhandler(const int sig) {
4222 printf("Graceful shutdown signal handled: %s.\n", strsignal(sig));
4223 stop_main_loop = GRACE_STOP;
4224 }
4225
4226 /*
4227 * On systems that supports multiple page sizes we may reduce the
4228 * number of TLB-misses by using the biggest available page size
4229 */
4230 static int enable_large_pages(void) {
4231 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
4232 int ret = -1;
4233 size_t sizes[32];
4234 int avail = getpagesizes(sizes, 32);
4235 if (avail != -1) {
4236 size_t max = sizes[0];
4237 struct memcntl_mha arg = {0};
4238 int ii;
4239
4240 for (ii = 1; ii < avail; ++ii) {
4241 if (max < sizes[ii]) {
4242 max = sizes[ii];
4243 }
4244 }
4245
4246 arg.mha_flags = 0;
4247 arg.mha_pagesize = max;
4248 arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
4249
4250 if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
4251 fprintf(stderr, "Failed to set large pages: %s\n",
4252 strerror(errno));
4253 fprintf(stderr, "Will use default page size\n");
4254 } else {
4255 ret = 0;
4256 }
4257 } else {
4258 fprintf(stderr, "Failed to get supported pagesizes: %s\n",
4259 strerror(errno));
4260 fprintf(stderr, "Will use default page size\n");
4261 }
4262
4263 return ret;
4264 #elif defined(__linux__) && defined(MADV_HUGEPAGE)
4265 /* check if transparent hugepages is compiled into the kernel */
4266 /* RH based systems possibly uses a different path */
4267 static const char *mm_thp_paths[] = {
4268 "/sys/kernel/mm/transparent_hugepage/enabled",
4269 "/sys/kernel/mm/redhat_transparent_hugepage/enabled",
4270 NULL
4271 };
4272
4273 char thpb[128] = {0};
4274 int pfd = -1;
4275 for (const char **p = mm_thp_paths; *p; p++) {
4276 if ((pfd = open(*p, O_RDONLY)) != -1)
4277 break;
4278 }
4279
4280 if (pfd == -1) {
4281 fprintf(stderr, "Transparent huge pages support not detected.\n");
4282 fprintf(stderr, "Will use default page size.\n");
4283 return -1;
4284 }
4285 ssize_t rd = read(pfd, thpb, sizeof(thpb));
4286 close(pfd);
4287 if (rd <= 0) {
4288 fprintf(stderr, "Transparent huge pages could not read the configuration.\n");
4289 fprintf(stderr, "Will use default page size.\n");
4290 return -1;
4291 }
4292 thpb[rd] = 0;
4293 if (strstr(thpb, "[never]")) {
4294 fprintf(stderr, "Transparent huge pages support disabled.\n");
4295 fprintf(stderr, "Will use default page size.\n");
4296 return -1;
4297 }
4298 return 0;
4299 #elif defined(__FreeBSD__)
4300 int spages;
4301 size_t spagesl = sizeof(spages);
4302
4303 if (sysctlbyname("vm.pmap.pg_ps_enabled", &spages,
4304 &spagesl, NULL, 0) != 0) {
4305 fprintf(stderr, "Could not evaluate the presence of superpages features.");
4306 return -1;
4307 }
4308 if (spages != 1) {
4309 fprintf(stderr, "Superpages support not detected.\n");
4310 fprintf(stderr, "Will use default page size.\n");
4311 return -1;
4312 }
4313 return 0;
4314 #else
4315 return -1;
4316 #endif
4317 }
4318
4319 /**
4320 * Do basic sanity check of the runtime environment
4321 * @return true if no errors found, false if we can't use this env
4322 */
4323 static bool sanitycheck(void) {
4324 /* One of our biggest problems is old and bogus libevents */
4325 const char *ever = event_get_version();
4326 if (ever != NULL) {
4327 if (strncmp(ever, "1.", 2) == 0) {
4328 fprintf(stderr, "You are using libevent %s.\nPlease upgrade to 2.x"
4329 " or newer\n", event_get_version());
4330 return false;
4331 }
4332 }
4333
4334 return true;
4335 }
4336
4337 static bool _parse_slab_sizes(char *s, uint32_t *slab_sizes) {
4338 char *b = NULL;
4339 uint32_t size = 0;
4340 int i = 0;
4341 uint32_t last_size = 0;
4342
4343 if (strlen(s) < 1)
4344 return false;
4345
4346 for (char *p = strtok_r(s, "-", &b);
4347 p != NULL;
4348 p = strtok_r(NULL, "-", &b)) {
4349 if (!safe_strtoul(p, &size) || size < settings.chunk_size
4350 || size > settings.slab_chunk_size_max) {
4351 fprintf(stderr, "slab size %u is out of valid range\n", size);
4352 return false;
4353 }
4354 if (last_size >= size) {
4355 fprintf(stderr, "slab size %u cannot be lower than or equal to a previous class size\n", size);
4356 return false;
4357 }
4358 if (size <= last_size + CHUNK_ALIGN_BYTES) {
4359 fprintf(stderr, "slab size %u must be at least %d bytes larger than previous class\n",
4360 size, CHUNK_ALIGN_BYTES);
4361 return false;
4362 }
4363 slab_sizes[i++] = size;
4364 last_size = size;
4365 if (i >= MAX_NUMBER_OF_SLAB_CLASSES-1) {
4366 fprintf(stderr, "too many slab classes specified\n");
4367 return false;
4368 }
4369 }
4370
4371 slab_sizes[i] = 0;
4372 return true;
4373 }
4374
4375 struct _mc_meta_data {
4376 void *mmap_base;
4377 uint64_t old_base;
4378 char *slab_config; // string containing either factor or custom slab list.
4379 int64_t time_delta;
4380 uint64_t process_started;
4381 uint32_t current_time;
4382 };
4383
4384 // We need to remember a combination of configuration settings and global
4385 // state for restart viability and resumption of internal services.
4386 // Compared to the number of tunables and state values, relatively little
4387 // does need to be remembered.
4388 // Time is the hardest; we have to assume the sys clock is correct and re-sync for
4389 // the lost time after restart.
4390 static int _mc_meta_save_cb(const char *tag, void *ctx, void *data) {
4391 struct _mc_meta_data *meta = (struct _mc_meta_data *)data;
4392
4393 // Settings to remember.
4394 // TODO: should get a version of version which is numeric, else
4395 // comparisons for compat reasons are difficult.
4396 // it may be possible to punt on this for now; since we can test for the
4397 // absence of another key... such as the new numeric version.
4398 //restart_set_kv(ctx, "version", "%s", VERSION);
4399 // We hold the original factor or subopts _string_
4400 // it can be directly compared without roundtripping through floats or
4401 // serializing/deserializing the long options list.
4402 restart_set_kv(ctx, "slab_config", "%s", meta->slab_config);
4403 restart_set_kv(ctx, "maxbytes", "%llu", (unsigned long long) settings.maxbytes);
4404 restart_set_kv(ctx, "chunk_size", "%d", settings.chunk_size);
4405 restart_set_kv(ctx, "item_size_max", "%d", settings.item_size_max);
4406 restart_set_kv(ctx, "slab_chunk_size_max", "%d", settings.slab_chunk_size_max);
4407 restart_set_kv(ctx, "slab_page_size", "%d", settings.slab_page_size);
4408 restart_set_kv(ctx, "use_cas", "%s", settings.use_cas ? "true" : "false");
4409 restart_set_kv(ctx, "slab_reassign", "%s", settings.slab_reassign ? "true" : "false");
4410
4411 // Online state to remember.
4412
4413 // current time is tough. we need to rely on the clock being correct to
4414 // pull the delta between stop and start times. we also need to know the
4415 // delta between start time and now to restore monotonic clocks.
4416 // for non-monotonic clocks (some OS?), process_started is the only
4417 // important one.
4418 restart_set_kv(ctx, "current_time", "%u", current_time);
4419 // types are great until... this. some systems time_t could be big, but
4420 // I'm assuming never negative.
4421 restart_set_kv(ctx, "process_started", "%llu", (unsigned long long) process_started);
4422 {
4423 struct timeval tv;
4424 gettimeofday(&tv, NULL);
4425 restart_set_kv(ctx, "stop_time", "%lu", tv.tv_sec);
4426 }
4427
4428 // Might as well just fetch the next CAS value to use than tightly
4429 // coupling the internal variable into the restart system.
4430 restart_set_kv(ctx, "current_cas", "%llu", (unsigned long long) get_cas_id());
4431 restart_set_kv(ctx, "oldest_cas", "%llu", (unsigned long long) settings.oldest_cas);
4432 restart_set_kv(ctx, "logger_gid", "%llu", logger_get_gid());
4433 restart_set_kv(ctx, "hashpower", "%u", stats_state.hash_power_level);
4434 // NOTE: oldest_live is a rel_time_t, which aliases for unsigned int.
4435 // should future proof this with a 64bit upcast, or fetch value from a
4436 // converter function/macro?
4437 restart_set_kv(ctx, "oldest_live", "%u", settings.oldest_live);
4438 // TODO: use uintptr_t etc? is it portable enough?
4439 restart_set_kv(ctx, "mmap_oldbase", "%p", meta->mmap_base);
4440
4441 return 0;
4442 }
4443
4444 // We must see at least this number of checked lines. Else empty/missing lines
4445 // could cause a false-positive.
4446 // TODO: Once crc32'ing of the metadata file is done this could be ensured better by
4447 // the restart module itself (crc32 + count of lines must match on the
4448 // backend)
4449 #define RESTART_REQUIRED_META 17
4450
4451 // With this callback we make a decision on if the current configuration
4452 // matches up enough to allow reusing the cache.
4453 // We also re-load important runtime information.
4454 static int _mc_meta_load_cb(const char *tag, void *ctx, void *data) {
4455 struct _mc_meta_data *meta = (struct _mc_meta_data *)data;
4456 char *key;
4457 char *val;
4458 int reuse_mmap = 0;
4459 meta->process_started = 0;
4460 meta->time_delta = 0;
4461 meta->current_time = 0;
4462 int lines_seen = 0;
4463
4464 // TODO: not sure this is any better than just doing an if/else tree with
4465 // strcmp's...
4466 enum {
4467 R_MMAP_OLDBASE = 0,
4468 R_MAXBYTES,
4469 R_CHUNK_SIZE,
4470 R_ITEM_SIZE_MAX,
4471 R_SLAB_CHUNK_SIZE_MAX,
4472 R_SLAB_PAGE_SIZE,
4473 R_SLAB_CONFIG,
4474 R_USE_CAS,
4475 R_SLAB_REASSIGN,
4476 R_CURRENT_CAS,
4477 R_OLDEST_CAS,
4478 R_OLDEST_LIVE,
4479 R_LOGGER_GID,
4480 R_CURRENT_TIME,
4481 R_STOP_TIME,
4482 R_PROCESS_STARTED,
4483 R_HASHPOWER,
4484 };
4485
4486 const char *opts[] = {
4487 [R_MMAP_OLDBASE] = "mmap_oldbase",
4488 [R_MAXBYTES] = "maxbytes",
4489 [R_CHUNK_SIZE] = "chunk_size",
4490 [R_ITEM_SIZE_MAX] = "item_size_max",
4491 [R_SLAB_CHUNK_SIZE_MAX] = "slab_chunk_size_max",
4492 [R_SLAB_PAGE_SIZE] = "slab_page_size",
4493 [R_SLAB_CONFIG] = "slab_config",
4494 [R_USE_CAS] = "use_cas",
4495 [R_SLAB_REASSIGN] = "slab_reassign",
4496 [R_CURRENT_CAS] = "current_cas",
4497 [R_OLDEST_CAS] = "oldest_cas",
4498 [R_OLDEST_LIVE] = "oldest_live",
4499 [R_LOGGER_GID] = "logger_gid",
4500 [R_CURRENT_TIME] = "current_time",
4501 [R_STOP_TIME] = "stop_time",
4502 [R_PROCESS_STARTED] = "process_started",
4503 [R_HASHPOWER] = "hashpower",
4504 NULL
4505 };
4506
4507 while (restart_get_kv(ctx, &key, &val) == RESTART_OK) {
4508 int type = 0;
4509 int32_t val_int = 0;
4510 uint32_t val_uint = 0;
4511 int64_t bigval_int = 0;
4512 uint64_t bigval_uint = 0;
4513
4514 while (opts[type] != NULL && strcmp(key, opts[type]) != 0) {
4515 type++;
4516 }
4517 if (opts[type] == NULL) {
4518 fprintf(stderr, "[restart] unknown/unhandled key: %s\n", key);
4519 continue;
4520 }
4521 lines_seen++;
4522
4523 // helper for any boolean checkers.
4524 bool val_bool = false;
4525 bool is_bool = true;
4526 if (strcmp(val, "false") == 0) {
4527 val_bool = false;
4528 } else if (strcmp(val, "true") == 0) {
4529 val_bool = true;
4530 } else {
4531 is_bool = false;
4532 }
4533
4534 switch (type) {
4535 case R_MMAP_OLDBASE:
4536 if (!safe_strtoull_hex(val, &meta->old_base)) {
4537 fprintf(stderr, "[restart] failed to parse %s: %s\n", key, val);
4538 reuse_mmap = -1;
4539 }
4540 break;
4541 case R_MAXBYTES:
4542 if (!safe_strtoll(val, &bigval_int) || settings.maxbytes != bigval_int) {
4543 reuse_mmap = -1;
4544 }
4545 break;
4546 case R_CHUNK_SIZE:
4547 if (!safe_strtol(val, &val_int) || settings.chunk_size != val_int) {
4548 reuse_mmap = -1;
4549 }
4550 break;
4551 case R_ITEM_SIZE_MAX:
4552 if (!safe_strtol(val, &val_int) || settings.item_size_max != val_int) {
4553 reuse_mmap = -1;
4554 }
4555 break;
4556 case R_SLAB_CHUNK_SIZE_MAX:
4557 if (!safe_strtol(val, &val_int) || settings.slab_chunk_size_max != val_int) {
4558 reuse_mmap = -1;
4559 }
4560 break;
4561 case R_SLAB_PAGE_SIZE:
4562 if (!safe_strtol(val, &val_int) || settings.slab_page_size != val_int) {
4563 reuse_mmap = -1;
4564 }
4565 break;
4566 case R_SLAB_CONFIG:
4567 if (strcmp(val, meta->slab_config) != 0) {
4568 reuse_mmap = -1;
4569 }
4570 break;
4571 case R_USE_CAS:
4572 if (!is_bool || settings.use_cas != val_bool) {
4573 reuse_mmap = -1;
4574 }
4575 break;
4576 case R_SLAB_REASSIGN:
4577 if (!is_bool || settings.slab_reassign != val_bool) {
4578 reuse_mmap = -1;
4579 }
4580 break;
4581 case R_CURRENT_CAS:
4582 // FIXME: do we need to fail if these values _aren't_ found?
4583 if (!safe_strtoull(val, &bigval_uint)) {
4584 reuse_mmap = -1;
4585 } else {
4586 set_cas_id(bigval_uint);
4587 }
4588 break;
4589 case R_OLDEST_CAS:
4590 if (!safe_strtoull(val, &bigval_uint)) {
4591 reuse_mmap = -1;
4592 } else {
4593 settings.oldest_cas = bigval_uint;
4594 }
4595 break;
4596 case R_OLDEST_LIVE:
4597 if (!safe_strtoul(val, &val_uint)) {
4598 reuse_mmap = -1;
4599 } else {
4600 settings.oldest_live = val_uint;
4601 }
4602 break;
4603 case R_LOGGER_GID:
4604 if (!safe_strtoull(val, &bigval_uint)) {
4605 reuse_mmap = -1;
4606 } else {
4607 logger_set_gid(bigval_uint);
4608 }
4609 break;
4610 case R_PROCESS_STARTED:
4611 if (!safe_strtoull(val, &bigval_uint)) {
4612 reuse_mmap = -1;
4613 } else {
4614 meta->process_started = bigval_uint;
4615 }
4616 break;
4617 case R_CURRENT_TIME:
4618 if (!safe_strtoul(val, &val_uint)) {
4619 reuse_mmap = -1;
4620 } else {
4621 meta->current_time = val_uint;
4622 }
4623 break;
4624 case R_STOP_TIME:
4625 if (!safe_strtoll(val, &bigval_int)) {
4626 reuse_mmap = -1;
4627 } else {
4628 struct timeval t;
4629 gettimeofday(&t, NULL);
4630 meta->time_delta = t.tv_sec - bigval_int;
4631 // clock has done something crazy.
4632 // there are _lots_ of ways the clock can go wrong here, but
4633 // this is a safe sanity check since there's nothing else we
4634 // can realistically do.
4635 if (meta->time_delta <= 0) {
4636 reuse_mmap = -1;
4637 }
4638 }
4639 break;
4640 case R_HASHPOWER:
4641 if (!safe_strtoul(val, &val_uint)) {
4642 reuse_mmap = -1;
4643 } else {
4644 settings.hashpower_init = val_uint;
4645 }
4646 break;
4647 default:
4648 fprintf(stderr, "[restart] unhandled key: %s\n", key);
4649 }
4650
4651 if (reuse_mmap != 0) {
4652 fprintf(stderr, "[restart] restart incompatible due to setting for [%s] [old value: %s]\n", key, val);
4653 break;
4654 }
4655 }
4656
4657 if (lines_seen < RESTART_REQUIRED_META) {
4658 fprintf(stderr, "[restart] missing some metadata lines\n");
4659 reuse_mmap = -1;
4660 }
4661
4662 return reuse_mmap;
4663 }
4664
4665 int main (int argc, char **argv) {
4666 int c;
4667 bool lock_memory = false;
4668 bool do_daemonize = false;
4669 bool preallocate = false;
4670 int maxcore = 0;
4671 char *username = NULL;
4672 char *pid_file = NULL;
4673 struct passwd *pw;
4674 struct rlimit rlim;
4675 char *buf;
4676 char unit = '\0';
4677 int size_max = 0;
4678 int retval = EXIT_SUCCESS;
4679 bool protocol_specified = false;
4680 bool tcp_specified = false;
4681 bool udp_specified = false;
4682 bool start_lru_maintainer = true;
4683 bool start_lru_crawler = true;
4684 bool start_assoc_maint = true;
4685 enum hashfunc_type hash_type = MURMUR3_HASH;
4686 uint32_t tocrawl;
4687 uint32_t slab_sizes[MAX_NUMBER_OF_SLAB_CLASSES];
4688 bool use_slab_sizes = false;
4689 char *slab_sizes_unparsed = NULL;
4690 bool slab_chunk_size_changed = false;
4691 // struct for restart code. Initialized up here so we can curry
4692 // important settings to save or validate.
4693 struct _mc_meta_data *meta = malloc(sizeof(struct _mc_meta_data));
4694 meta->slab_config = NULL;
4695 char *subopts, *subopts_orig;
4696 char *subopts_value;
4697 enum {
4698 MAXCONNS_FAST = 0,
4699 HASHPOWER_INIT,
4700 NO_HASHEXPAND,
4701 SLAB_REASSIGN,
4702 SLAB_AUTOMOVE,
4703 SLAB_AUTOMOVE_RATIO,
4704 SLAB_AUTOMOVE_WINDOW,
4705 TAIL_REPAIR_TIME,
4706 HASH_ALGORITHM,
4707 LRU_CRAWLER,
4708 LRU_CRAWLER_SLEEP,
4709 LRU_CRAWLER_TOCRAWL,
4710 LRU_MAINTAINER,
4711 HOT_LRU_PCT,
4712 WARM_LRU_PCT,
4713 HOT_MAX_FACTOR,
4714 WARM_MAX_FACTOR,
4715 TEMPORARY_TTL,
4716 IDLE_TIMEOUT,
4717 WATCHER_LOGBUF_SIZE,
4718 WORKER_LOGBUF_SIZE,
4719 SLAB_SIZES,
4720 SLAB_CHUNK_MAX,
4721 TRACK_SIZES,
4722 NO_INLINE_ASCII_RESP,
4723 MODERN,
4724 NO_MODERN,
4725 NO_CHUNKED_ITEMS,
4726 NO_SLAB_REASSIGN,
4727 NO_SLAB_AUTOMOVE,
4728 NO_MAXCONNS_FAST,
4729 INLINE_ASCII_RESP,
4730 NO_LRU_CRAWLER,
4731 NO_LRU_MAINTAINER,
4732 NO_DROP_PRIVILEGES,
4733 DROP_PRIVILEGES,
4734 RESP_OBJ_MEM_LIMIT,
4735 READ_BUF_MEM_LIMIT,
4736 META_RESPONSE_OLD,
4737 #ifdef TLS
4738 SSL_CERT,
4739 SSL_KEY,
4740 SSL_VERIFY_MODE,
4741 SSL_KEYFORM,
4742 SSL_CIPHERS,
4743 SSL_CA_CERT,
4744 SSL_WBUF_SIZE,
4745 SSL_SESSION_CACHE,
4746 SSL_MIN_VERSION,
4747 #endif
4748 #ifdef PROXY
4749 PROXY_CONFIG,
4750 PROXY_URING,
4751 #endif
4752 #ifdef MEMCACHED_DEBUG
4753 RELAXED_PRIVILEGES,
4754 #endif
4755 };
4756 char *const subopts_tokens[] = {
4757 [MAXCONNS_FAST] = "maxconns_fast",
4758 [HASHPOWER_INIT] = "hashpower",
4759 [NO_HASHEXPAND] = "no_hashexpand",
4760 [SLAB_REASSIGN] = "slab_reassign",
4761 [SLAB_AUTOMOVE] = "slab_automove",
4762 [SLAB_AUTOMOVE_RATIO] = "slab_automove_ratio",
4763 [SLAB_AUTOMOVE_WINDOW] = "slab_automove_window",
4764 [TAIL_REPAIR_TIME] = "tail_repair_time",
4765 [HASH_ALGORITHM] = "hash_algorithm",
4766 [LRU_CRAWLER] = "lru_crawler",
4767 [LRU_CRAWLER_SLEEP] = "lru_crawler_sleep",
4768 [LRU_CRAWLER_TOCRAWL] = "lru_crawler_tocrawl",
4769 [LRU_MAINTAINER] = "lru_maintainer",
4770 [HOT_LRU_PCT] = "hot_lru_pct",
4771 [WARM_LRU_PCT] = "warm_lru_pct",
4772 [HOT_MAX_FACTOR] = "hot_max_factor",
4773 [WARM_MAX_FACTOR] = "warm_max_factor",
4774 [TEMPORARY_TTL] = "temporary_ttl",
4775 [IDLE_TIMEOUT] = "idle_timeout",
4776 [WATCHER_LOGBUF_SIZE] = "watcher_logbuf_size",
4777 [WORKER_LOGBUF_SIZE] = "worker_logbuf_size",
4778 [SLAB_SIZES] = "slab_sizes",
4779 [SLAB_CHUNK_MAX] = "slab_chunk_max",
4780 [TRACK_SIZES] = "track_sizes",
4781