"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "memcached.c" between
memcached-1.6.8.tar.gz and memcached-1.6.9.tar.gz

About: memcached is a high-performance, distributed memory object caching system, generic in nature, but originally intended for use in speeding up dynamic web applications by alleviating database load.

memcached.c  (memcached-1.6.8):memcached.c  (memcached-1.6.9)
skipping to change at line 17 skipping to change at line 17
* Copyright 2003 Danga Interactive, Inc. All rights reserved. * Copyright 2003 Danga Interactive, Inc. All rights reserved.
* *
* Use and distribution licensed under the BSD license. See * Use and distribution licensed under the BSD license. See
* the LICENSE file for full text. * the LICENSE file for full text.
* *
* Authors: * Authors:
* Anatoly Vorobey <mellon@pobox.com> * Anatoly Vorobey <mellon@pobox.com>
* Brad Fitzpatrick <brad@danga.com> * Brad Fitzpatrick <brad@danga.com>
*/ */
#include "memcached.h" #include "memcached.h"
#ifdef EXTSTORE
#include "storage.h" #include "storage.h"
#endif
#include "authfile.h" #include "authfile.h"
#include "restart.h" #include "restart.h"
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
#include <signal.h> #include <signal.h>
#include <sys/param.h> #include <sys/param.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <sys/uio.h> #include <sys/uio.h>
#include <ctype.h> #include <ctype.h>
skipping to change at line 104 skipping to change at line 102
/* defaults */ /* defaults */
static void settings_init(void); static void settings_init(void);
/* event handling, network IO */ /* event handling, network IO */
static void event_handler(const evutil_socket_t fd, const short which, void *arg ); static void event_handler(const evutil_socket_t fd, const short which, void *arg );
static void conn_close(conn *c); static void conn_close(conn *c);
static void conn_init(void); static void conn_init(void);
static bool update_event(conn *c, const int new_flags); static bool update_event(conn *c, const int new_flags);
static void complete_nread(conn *c); static void complete_nread(conn *c);
#ifdef EXTSTORE
static void _get_extstore_cb(void *e, obj_io *io, int ret);
#endif
static void conn_free(conn *c); static void conn_free(conn *c);
/** exported globals **/ /** exported globals **/
struct stats stats; struct stats stats;
struct stats_state stats_state; struct stats_state stats_state;
struct settings settings; struct settings settings;
time_t process_started; /* when the process was started */ time_t process_started; /* when the process was started */
conn **conns; conn **conns;
struct slab_rebalance slab_rebal; struct slab_rebalance slab_rebal;
skipping to change at line 295 skipping to change at line 290
settings.dump_enabled = true; settings.dump_enabled = true;
settings.crawls_persleep = 1000; settings.crawls_persleep = 1000;
settings.logger_watcher_buf_size = LOGGER_WATCHER_BUF_SIZE; settings.logger_watcher_buf_size = LOGGER_WATCHER_BUF_SIZE;
settings.logger_buf_size = LOGGER_BUF_SIZE; settings.logger_buf_size = LOGGER_BUF_SIZE;
settings.drop_privileges = false; settings.drop_privileges = false;
settings.watch_enabled = true; settings.watch_enabled = true;
settings.read_buf_mem_limit = 0; settings.read_buf_mem_limit = 0;
#ifdef MEMCACHED_DEBUG #ifdef MEMCACHED_DEBUG
settings.relaxed_privileges = false; settings.relaxed_privileges = false;
#endif #endif
settings.num_napi_ids = 0;
settings.memory_file = NULL;
} }
extern pthread_mutex_t conn_lock; extern pthread_mutex_t conn_lock;
/* Connection timeout thread bits */ /* Connection timeout thread bits */
static pthread_t conn_timeout_tid; static pthread_t conn_timeout_tid;
static int do_run_conn_timeout_thread; static int do_run_conn_timeout_thread;
static pthread_cond_t conn_timeout_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t conn_timeout_lock = PTHREAD_MUTEX_INITIALIZER;
#define CONNS_PER_SLICE 100 #define CONNS_PER_SLICE 100
#define TIMEOUT_MSG_SIZE (1 + sizeof(int)) #define TIMEOUT_MSG_SIZE (1 + sizeof(int))
static void *conn_timeout_thread(void *arg) { static void *conn_timeout_thread(void *arg) {
int i; int i;
conn *c; conn *c;
char buf[TIMEOUT_MSG_SIZE]; char buf[TIMEOUT_MSG_SIZE];
rel_time_t oldest_last_cmd; rel_time_t oldest_last_cmd;
int sleep_time; int sleep_time;
int sleep_slice = max_fds / CONNS_PER_SLICE; int sleep_slice = max_fds / CONNS_PER_SLICE;
if (sleep_slice == 0) if (sleep_slice == 0)
sleep_slice = CONNS_PER_SLICE; sleep_slice = CONNS_PER_SLICE;
useconds_t timeslice = 1000000 / sleep_slice; useconds_t timeslice = 1000000 / sleep_slice;
mutex_lock(&conn_timeout_lock);
while(do_run_conn_timeout_thread) { while(do_run_conn_timeout_thread) {
if (settings.verbose > 2) if (settings.verbose > 2)
fprintf(stderr, "idle timeout thread at top of connection list\n"); fprintf(stderr, "idle timeout thread at top of connection list\n");
oldest_last_cmd = current_time; oldest_last_cmd = current_time;
for (i = 0; i < max_fds; i++) { for (i = 0; i < max_fds; i++) {
if ((i % CONNS_PER_SLICE) == 0) { if ((i % CONNS_PER_SLICE) == 0) {
if (settings.verbose > 2) if (settings.verbose > 2)
fprintf(stderr, "idle timeout thread sleeping for %ulus\n", fprintf(stderr, "idle timeout thread sleeping for %ulus\n",
skipping to change at line 363 skipping to change at line 363
/* This is the soonest we could have another connection time out */ /* This is the soonest we could have another connection time out */
sleep_time = settings.idle_timeout - (current_time - oldest_last_cmd) + 1; sleep_time = settings.idle_timeout - (current_time - oldest_last_cmd) + 1;
if (sleep_time <= 0) if (sleep_time <= 0)
sleep_time = 1; sleep_time = 1;
if (settings.verbose > 2) if (settings.verbose > 2)
fprintf(stderr, fprintf(stderr,
"idle timeout thread finished pass, sleeping for %ds\n", "idle timeout thread finished pass, sleeping for %ds\n",
sleep_time); sleep_time);
usleep((useconds_t) sleep_time * 1000000);
struct timeval now;
struct timespec to_sleep;
gettimeofday(&now, NULL);
to_sleep.tv_sec = now.tv_sec + sleep_time;
to_sleep.tv_nsec = 0;
pthread_cond_timedwait(&conn_timeout_cond, &conn_timeout_lock, &to_sleep
);
} }
mutex_unlock(&conn_timeout_lock);
return NULL; return NULL;
} }
static int start_conn_timeout_thread() { static int start_conn_timeout_thread() {
int ret; int ret;
if (settings.idle_timeout == 0) if (settings.idle_timeout == 0)
return -1; return -1;
do_run_conn_timeout_thread = 1; do_run_conn_timeout_thread = 1;
skipping to change at line 389 skipping to change at line 397
strerror(ret)); strerror(ret));
return -1; return -1;
} }
return 0; return 0;
} }
int stop_conn_timeout_thread(void) { int stop_conn_timeout_thread(void) {
if (!do_run_conn_timeout_thread) if (!do_run_conn_timeout_thread)
return -1; return -1;
mutex_lock(&conn_timeout_lock);
do_run_conn_timeout_thread = 0; do_run_conn_timeout_thread = 0;
pthread_cond_signal(&conn_timeout_cond);
mutex_unlock(&conn_timeout_lock);
pthread_join(conn_timeout_tid, NULL); pthread_join(conn_timeout_tid, NULL);
return 0; return 0;
} }
/* /*
* read buffer cache helper functions * read buffer cache helper functions
*/ */
static void rbuf_release(conn *c) { static void rbuf_release(conn *c) {
if (c->rbuf != NULL && c->rbytes == 0 && !IS_UDP(c->transport)) { if (c->rbuf != NULL && c->rbytes == 0 && !IS_UDP(c->transport)) {
if (c->rbuf_malloced) { if (c->rbuf_malloced) {
skipping to change at line 524 skipping to change at line 535
c->thread->stats.idle_kicks++; c->thread->stats.idle_kicks++;
pthread_mutex_unlock(&c->thread->stats.mutex); pthread_mutex_unlock(&c->thread->stats.mutex);
conn_set_state(c, conn_closing); conn_set_state(c, conn_closing);
drive_machine(c); drive_machine(c);
} }
} }
/* bring conn back from a sidethread. could have had its event base moved. */ /* bring conn back from a sidethread. could have had its event base moved. */
void conn_worker_readd(conn *c) { void conn_worker_readd(conn *c) {
if (c->state == conn_io_queue) {
c->io_queues_submitted--;
// If we're still waiting for other queues to return, don't re-add the
// connection yet.
if (c->io_queues_submitted != 0) {
return;
}
}
c->ev_flags = EV_READ | EV_PERSIST; c->ev_flags = EV_READ | EV_PERSIST;
event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c); event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c);
event_base_set(c->thread->base, &c->event); event_base_set(c->thread->base, &c->event);
// TODO: call conn_cleanup/fail/etc // TODO: call conn_cleanup/fail/etc
if (event_add(&c->event, 0) == -1) { if (event_add(&c->event, 0) == -1) {
perror("event_add"); perror("event_add");
} }
// side thread wanted us to close immediately. // side thread wanted us to close immediately.
if (c->state == conn_closing) { if (c->state == conn_closing) {
drive_machine(c); drive_machine(c);
return; return;
} else if (c->state == conn_io_queue) {
// machine will know how to return based on secondary state.
drive_machine(c);
} else {
conn_set_state(c, conn_new_cmd);
} }
c->state = conn_new_cmd; }
#ifdef EXTSTORE void conn_io_queue_add(conn *c, int type, void *ctx, io_queue_stack_cb cb, io_qu
// If we had IO objects, process eue_stack_cb com_cb, io_queue_cb fin_cb) {
if (c->io_wraplist) { io_queue_t *q = c->io_queues;
//assert(c->io_wrapleft == 0); // assert no more to process while (q->type != IO_QUEUE_NONE) {
conn_set_state(c, conn_mwrite); q++;
drive_machine(c); }
q->type = type;
q->ctx = ctx;
q->stack_ctx = NULL;
q->submit_cb = cb;
q->complete_cb = com_cb;
q->finalize_cb = fin_cb;
return;
}
io_queue_t *conn_io_queue_get(conn *c, int type) {
io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
if (q->type == type) {
return q;
}
q++;
}
return NULL;
}
// called after returning to the main worker thread.
// users of the queue need to distinguish if the IO was actually consumed or
// not and handle appropriately.
static void conn_io_queue_complete(conn *c) {
io_queue_t *q = c->io_queues;
while (q->type != IO_QUEUE_NONE) {
// Reuse the same submit stack. We zero it out first so callbacks can
// queue new IO's if necessary.
if (q->stack_ctx) {
void *tmp = q->stack_ctx;
q->stack_ctx = NULL;
q->complete_cb(q->ctx, tmp);
}
q++;
} }
#endif
} }
conn *conn_new(const int sfd, enum conn_states init_state, conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags, const int event_flags,
const int read_buffer_size, enum network_transport transport, const int read_buffer_size, enum network_transport transport,
struct event_base *base, void *ssl) { struct event_base *base, void *ssl) {
conn *c; conn *c;
assert(sfd >= 0 && sfd < max_fds); assert(sfd >= 0 && sfd < max_fds);
c = conns[sfd]; c = conns[sfd];
skipping to change at line 654 skipping to change at line 710
c->cmd = -1; c->cmd = -1;
c->rbytes = 0; c->rbytes = 0;
c->rcurr = c->rbuf; c->rcurr = c->rbuf;
c->ritem = 0; c->ritem = 0;
c->rbuf_malloced = false; c->rbuf_malloced = false;
c->sasl_started = false; c->sasl_started = false;
c->set_stale = false; c->set_stale = false;
c->mset_res = false; c->mset_res = false;
c->close_after_write = false; c->close_after_write = false;
c->last_cmd_time = current_time; /* initialize for idle kicker */ c->last_cmd_time = current_time; /* initialize for idle kicker */
#ifdef EXTSTORE // wipe all queues.
c->io_wraplist = NULL; memset(c->io_queues, 0, sizeof(c->io_queues));
c->io_wrapleft = 0; c->io_queues_submitted = 0;
#endif
c->item = 0; c->item = 0;
c->noreply = false; c->noreply = false;
#ifdef TLS #ifdef TLS
if (ssl) { if (ssl) {
c->ssl = (SSL*)ssl; c->ssl = (SSL*)ssl;
c->read = ssl_read; c->read = ssl_read;
c->sendmsg = ssl_sendmsg; c->sendmsg = ssl_sendmsg;
skipping to change at line 724 skipping to change at line 779
STATS_LOCK(); STATS_LOCK();
stats_state.curr_conns++; stats_state.curr_conns++;
stats.total_conns++; stats.total_conns++;
STATS_UNLOCK(); STATS_UNLOCK();
MEMCACHED_CONN_ALLOCATE(c->sfd); MEMCACHED_CONN_ALLOCATE(c->sfd);
return c; return c;
} }
#ifdef EXTSTORE
static void recache_or_free(conn *c, io_wrap *wrap) {
item *it;
it = (item *)wrap->io.buf;
bool do_free = true;
if (wrap->active) {
// If request never dispatched, free the read buffer but leave the
// item header alone.
do_free = false;
size_t ntotal = ITEM_ntotal(wrap->hdr_it);
slabs_free(it, ntotal, slabs_clsid(ntotal));
c->io_wrapleft--;
assert(c->io_wrapleft >= 0);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_aborted_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
} else if (wrap->miss) {
// If request was ultimately a miss, unlink the header.
do_free = false;
size_t ntotal = ITEM_ntotal(wrap->hdr_it);
item_unlink(wrap->hdr_it);
slabs_free(it, ntotal, slabs_clsid(ntotal));
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.miss_from_extstore++;
if (wrap->badcrc)
c->thread->stats.badcrc_from_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
} else if (settings.ext_recache_rate) {
// hashvalue is cuddled during store
uint32_t hv = (uint32_t)it->time;
// opt to throw away rather than wait on a lock.
void *hold_lock = item_trylock(hv);
if (hold_lock != NULL) {
item *h_it = wrap->hdr_it;
uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE;
// Item must be recently hit at least twice to recache.
if (((h_it->it_flags & flags) == flags) &&
h_it->time > current_time - ITEM_UPDATE_INTERVAL &&
c->recache_counter++ % settings.ext_recache_rate == 0) {
do_free = false;
// In case it's been updated.
it->exptime = h_it->exptime;
it->it_flags &= ~ITEM_LINKED;
it->refcount = 0;
it->h_next = NULL; // might not be necessary.
STORAGE_delete(c->thread->storage, h_it);
item_replace(h_it, it, hv);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.recache_from_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
}
}
if (hold_lock)
item_trylock_unlock(hold_lock);
}
if (do_free)
slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it));
wrap->io.buf = NULL; // sanity.
wrap->io.next = NULL;
wrap->next = NULL;
wrap->active = false;
// TODO: reuse lock and/or hv.
item_remove(wrap->hdr_it);
}
#endif
void conn_release_items(conn *c) { void conn_release_items(conn *c) {
assert(c != NULL); assert(c != NULL);
if (c->item) { if (c->item) {
item_remove(c->item); item_remove(c->item);
c->item = 0; c->item = 0;
} }
#ifdef EXTSTORE
if (c->io_wraplist) {
io_wrap *tmp = c->io_wraplist;
while (tmp) {
io_wrap *next = tmp->next;
recache_or_free(c, tmp);
// malloc'ed iovec list used for chunked extstore fetches.
if (tmp->io.iov) {
free(tmp->io.iov);
tmp->io.iov = NULL;
}
do_cache_free(c->thread->io_cache, tmp); // lockless
tmp = next;
}
c->io_wraplist = NULL;
}
#endif
// Cull any unsent responses. // Cull any unsent responses.
if (c->resp_head) { if (c->resp_head) {
mc_resp *resp = c->resp_head; mc_resp *resp = c->resp_head;
// r_f() handles the chain maintenance. // r_f() handles the chain maintenance.
while (resp) { while (resp) {
// temporary by default. hide behind a debug flag in the future: // temporary by default. hide behind a debug flag in the future:
// double free detection. Transmit loops can drop out early, but // double free detection. Transmit loops can drop out early, but
// here we could infinite loop. // here we could infinite loop.
if (resp->free) { if (resp->free) {
fprintf(stderr, "ERROR: double free detected during conn_release _items(): [%d] [%s]\n", fprintf(stderr, "ERROR: double free detected during conn_release _items(): [%d] [%s]\n",
skipping to change at line 940 skipping to change at line 911
"conn_new_cmd", "conn_new_cmd",
"conn_waiting", "conn_waiting",
"conn_read", "conn_read",
"conn_parse_cmd", "conn_parse_cmd",
"conn_write", "conn_write",
"conn_nread", "conn_nread",
"conn_swallow", "conn_swallow",
"conn_closing", "conn_closing",
"conn_mwrite", "conn_mwrite",
"conn_closed", "conn_closed",
"conn_watch" }; "conn_watch",
"conn_io_queue" };
return statenames[state]; return statenames[state];
} }
/* /*
* Sets a connection's current state in the state machine. Any special * Sets a connection's current state in the state machine. Any special
* processing that needs to happen on certain state transitions can * processing that needs to happen on certain state transitions can
* happen here. * happen here.
*/ */
void conn_set_state(conn *c, enum conn_states state) { void conn_set_state(conn *c, enum conn_states state) {
assert(c != NULL); assert(c != NULL);
skipping to change at line 1165 skipping to change at line 1137
mc_resp* resp_finish(conn *c, mc_resp *resp) { mc_resp* resp_finish(conn *c, mc_resp *resp) {
mc_resp *next = resp->next; mc_resp *next = resp->next;
if (resp->item) { if (resp->item) {
// TODO: cache hash value in resp obj? // TODO: cache hash value in resp obj?
item_remove(resp->item); item_remove(resp->item);
resp->item = NULL; resp->item = NULL;
} }
if (resp->write_and_free) { if (resp->write_and_free) {
free(resp->write_and_free); free(resp->write_and_free);
} }
if (resp->io_pending) {
// If we had a pending IO, tell it to internally clean up then return
// the main object back to our thread cache.
resp->io_pending->q->finalize_cb(resp->io_pending);
do_cache_free(c->thread->io_cache, resp->io_pending);
resp->io_pending = NULL;
}
if (c->resp_head == resp) { if (c->resp_head == resp) {
c->resp_head = next; c->resp_head = next;
} }
if (c->resp == resp) { if (c->resp == resp) {
c->resp = NULL; c->resp = NULL;
} }
resp_free(c, resp); resp_free(c, resp);
THR_STATS_LOCK(c); THR_STATS_LOCK(c);
c->thread->stats.response_obj_count--; c->thread->stats.response_obj_count--;
THR_STATS_UNLOCK(c); THR_STATS_UNLOCK(c);
skipping to change at line 1693 skipping to change at line 1672
/* return server specific stats only */ /* return server specific stats only */
void server_stats(ADD_STAT add_stats, conn *c) { void server_stats(ADD_STAT add_stats, conn *c) {
pid_t pid = getpid(); pid_t pid = getpid();
rel_time_t now = current_time; rel_time_t now = current_time;
struct thread_stats thread_stats; struct thread_stats thread_stats;
threadlocal_stats_aggregate(&thread_stats); threadlocal_stats_aggregate(&thread_stats);
struct slab_stats slab_stats; struct slab_stats slab_stats;
slab_stats_aggregate(&thread_stats, &slab_stats); slab_stats_aggregate(&thread_stats, &slab_stats);
#ifdef EXTSTORE
struct extstore_stats st;
#endif
#ifndef WIN32 #ifndef WIN32
struct rusage usage; struct rusage usage;
getrusage(RUSAGE_SELF, &usage); getrusage(RUSAGE_SELF, &usage);
#endif /* !WIN32 */ #endif /* !WIN32 */
STATS_LOCK(); STATS_LOCK();
APPEND_STAT("pid", "%lu", (long)pid); APPEND_STAT("pid", "%lu", (long)pid);
APPEND_STAT("uptime", "%u", now - ITEM_UPDATE_INTERVAL); APPEND_STAT("uptime", "%u", now - ITEM_UPDATE_INTERVAL);
APPEND_STAT("time", "%ld", now + (long)process_started); APPEND_STAT("time", "%ld", now + (long)process_started);
skipping to change at line 1805 skipping to change at line 1781
APPEND_STAT("lru_maintainer_juggles", "%llu", (unsigned long long)stats. lru_maintainer_juggles); APPEND_STAT("lru_maintainer_juggles", "%llu", (unsigned long long)stats. lru_maintainer_juggles);
} }
APPEND_STAT("malloc_fails", "%llu", APPEND_STAT("malloc_fails", "%llu",
(unsigned long long)stats.malloc_fails); (unsigned long long)stats.malloc_fails);
APPEND_STAT("log_worker_dropped", "%llu", (unsigned long long)stats.log_work er_dropped); APPEND_STAT("log_worker_dropped", "%llu", (unsigned long long)stats.log_work er_dropped);
APPEND_STAT("log_worker_written", "%llu", (unsigned long long)stats.log_work er_written); APPEND_STAT("log_worker_written", "%llu", (unsigned long long)stats.log_work er_written);
APPEND_STAT("log_watcher_skipped", "%llu", (unsigned long long)stats.log_wat cher_skipped); APPEND_STAT("log_watcher_skipped", "%llu", (unsigned long long)stats.log_wat cher_skipped);
APPEND_STAT("log_watcher_sent", "%llu", (unsigned long long)stats.log_watche r_sent); APPEND_STAT("log_watcher_sent", "%llu", (unsigned long long)stats.log_watche r_sent);
STATS_UNLOCK(); STATS_UNLOCK();
#ifdef EXTSTORE #ifdef EXTSTORE
if (c->thread->storage) { storage_stats(add_stats, c);
STATS_LOCK();
APPEND_STAT("extstore_compact_lost", "%llu", (unsigned long long)stats.e
xtstore_compact_lost);
APPEND_STAT("extstore_compact_rescues", "%llu", (unsigned long long)stat
s.extstore_compact_rescues);
APPEND_STAT("extstore_compact_skipped", "%llu", (unsigned long long)stat
s.extstore_compact_skipped);
STATS_UNLOCK();
extstore_get_stats(c->thread->storage, &st);
APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_
allocs);
APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.pa
ge_evictions);
APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.pag
e_reclaims);
APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_
free);
APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_
used);
APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.o
bjects_evicted);
APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.obje
cts_read);
APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.o
bjects_written);
APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.obje
cts_used);
APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.byt
es_evicted);
APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.byt
es_written);
APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_
read);
APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_
used);
APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.
bytes_fragmented);
APPEND_STAT("extstore_limit_maxbytes", "%llu", (unsigned long long)(st.p
age_count * st.page_size));
APPEND_STAT("extstore_io_queue", "%llu", (unsigned long long)(st.io_queu
e));
}
#endif #endif
#ifdef TLS #ifdef TLS
if (settings.ssl_enabled) { if (settings.ssl_enabled) {
if (settings.ssl_session_cache) { if (settings.ssl_session_cache) {
APPEND_STAT("ssl_new_sessions", "%llu", (unsigned long long)stats.ss l_new_sessions); APPEND_STAT("ssl_new_sessions", "%llu", (unsigned long long)stats.ss l_new_sessions);
} }
APPEND_STAT("ssl_handshake_errors", "%llu", (unsigned long long)stats.ss l_handshake_errors); APPEND_STAT("ssl_handshake_errors", "%llu", (unsigned long long)stats.ss l_handshake_errors);
APPEND_STAT("time_since_server_cert_refresh", "%u", now - settings.ssl_l ast_cert_refresh_time); APPEND_STAT("time_since_server_cert_refresh", "%u", now - settings.ssl_l ast_cert_refresh_time);
} }
#endif #endif
APPEND_STAT("unexpected_napi_ids", "%llu", (unsigned long long)stats.unexpec
ted_napi_ids);
APPEND_STAT("round_robin_fallback", "%llu", (unsigned long long)stats.round_
robin_fallback);
} }
void process_stat_settings(ADD_STAT add_stats, void *c) { void process_stat_settings(ADD_STAT add_stats, void *c) {
assert(add_stats); assert(add_stats);
APPEND_STAT("maxbytes", "%llu", (unsigned long long)settings.maxbytes); APPEND_STAT("maxbytes", "%llu", (unsigned long long)settings.maxbytes);
APPEND_STAT("maxconns", "%d", settings.maxconns); APPEND_STAT("maxconns", "%d", settings.maxconns);
APPEND_STAT("tcpport", "%d", settings.port); APPEND_STAT("tcpport", "%d", settings.port);
APPEND_STAT("udpport", "%d", settings.udpport); APPEND_STAT("udpport", "%d", settings.udpport);
APPEND_STAT("inter", "%s", settings.inter ? settings.inter : "NULL"); APPEND_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
APPEND_STAT("verbosity", "%d", settings.verbose); APPEND_STAT("verbosity", "%d", settings.verbose);
skipping to change at line 1923 skipping to change at line 1878
APPEND_STAT("ssl_enabled", "%s", settings.ssl_enabled ? "yes" : "no"); APPEND_STAT("ssl_enabled", "%s", settings.ssl_enabled ? "yes" : "no");
APPEND_STAT("ssl_chain_cert", "%s", settings.ssl_chain_cert); APPEND_STAT("ssl_chain_cert", "%s", settings.ssl_chain_cert);
APPEND_STAT("ssl_key", "%s", settings.ssl_key); APPEND_STAT("ssl_key", "%s", settings.ssl_key);
APPEND_STAT("ssl_verify_mode", "%d", settings.ssl_verify_mode); APPEND_STAT("ssl_verify_mode", "%d", settings.ssl_verify_mode);
APPEND_STAT("ssl_keyformat", "%d", settings.ssl_keyformat); APPEND_STAT("ssl_keyformat", "%d", settings.ssl_keyformat);
APPEND_STAT("ssl_ciphers", "%s", settings.ssl_ciphers ? settings.ssl_ciphers : "NULL"); APPEND_STAT("ssl_ciphers", "%s", settings.ssl_ciphers ? settings.ssl_ciphers : "NULL");
APPEND_STAT("ssl_ca_cert", "%s", settings.ssl_ca_cert ? settings.ssl_ca_cert : "NULL"); APPEND_STAT("ssl_ca_cert", "%s", settings.ssl_ca_cert ? settings.ssl_ca_cert : "NULL");
APPEND_STAT("ssl_wbuf_size", "%u", settings.ssl_wbuf_size); APPEND_STAT("ssl_wbuf_size", "%u", settings.ssl_wbuf_size);
APPEND_STAT("ssl_session_cache", "%s", settings.ssl_session_cache ? "yes" : "no"); APPEND_STAT("ssl_session_cache", "%s", settings.ssl_session_cache ? "yes" : "no");
#endif #endif
APPEND_STAT("num_napi_ids", "%s", settings.num_napi_ids);
APPEND_STAT("memory_file", "%s", settings.memory_file);
} }
static int nz_strcmp(int nzlength, const char *nz, const char *z) { static int nz_strcmp(int nzlength, const char *nz, const char *z) {
int zlength=strlen(z); int zlength=strlen(z);
return (zlength == nzlength) && (strncmp(nz, z, zlength) == 0) ? 0 : -1; return (zlength == nzlength) && (strncmp(nz, z, zlength) == 0) ? 0 : -1;
} }
bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c) { bool get_stats(const char *stat_type, int nkey, ADD_STAT add_stats, void *c) {
bool ret = true; bool ret = true;
skipping to change at line 2109 skipping to change at line 2066
APPEND_NUM_STAT(i, "listen_addr", "%s", svr_addr); APPEND_NUM_STAT(i, "listen_addr", "%s", svr_addr);
} }
APPEND_NUM_STAT(i, "state", "%s", APPEND_NUM_STAT(i, "state", "%s",
state_text(conns[i]->state)); state_text(conns[i]->state));
APPEND_NUM_STAT(i, "secs_since_last_cmd", "%d", APPEND_NUM_STAT(i, "secs_since_last_cmd", "%d",
current_time - conns[i]->last_cmd_time); current_time - conns[i]->last_cmd_time);
} }
} }
} }
} }
#ifdef EXTSTORE
void process_extstore_stats(ADD_STAT add_stats, conn *c) {
int i;
char key_str[STAT_KEY_LEN];
char val_str[STAT_VAL_LEN];
int klen = 0, vlen = 0;
struct extstore_stats st;
assert(add_stats);
void *storage = c->thread->storage;
extstore_get_stats(storage, &st);
st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
extstore_get_page_data(storage, &st);
for (i = 0; i < st.page_count; i++) {
APPEND_NUM_STAT(i, "version", "%llu",
(unsigned long long) st.page_data[i].version);
APPEND_NUM_STAT(i, "bytes", "%llu",
(unsigned long long) st.page_data[i].bytes_used);
APPEND_NUM_STAT(i, "bucket", "%u",
st.page_data[i].bucket);
APPEND_NUM_STAT(i, "free_bucket", "%u",
st.page_data[i].free_bucket);
}
}
#endif
#define IT_REFCOUNT_LIMIT 60000 #define IT_REFCOUNT_LIMIT 60000
item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should _touch, bool do_update, bool *overflow) { item* limited_get(char *key, size_t nkey, conn *c, uint32_t exptime, bool should _touch, bool do_update, bool *overflow) {
item *it; item *it;
if (should_touch) { if (should_touch) {
it = item_touch(key, nkey, exptime, c); it = item_touch(key, nkey, exptime, c);
} else { } else {
it = item_get(key, nkey, c, do_update); it = item_get(key, nkey, c, do_update);
} }
if (it && it->refcount > IT_REFCOUNT_LIMIT) { if (it && it->refcount > IT_REFCOUNT_LIMIT) {
skipping to change at line 2173 skipping to change at line 2103
do_item_remove(it); do_item_remove(it);
it = NULL; it = NULL;
item_unlock(*hv); item_unlock(*hv);
*overflow = true; *overflow = true;
} else { } else {
*overflow = false; *overflow = false;
} }
return it; return it;
} }
#ifdef EXTSTORE
// FIXME: This runs in the IO thread. to get better IO performance this should
// simply mark the io wrapper with the return value and decrement wrapleft, if
// zero redispatching. Still a bit of work being done in the side thread but
// minimized at least.
static void _get_extstore_cb(void *e, obj_io *io, int ret) {
// FIXME: assumes success
io_wrap *wrap = (io_wrap *)io->data;
mc_resp *resp = wrap->resp;
conn *c = wrap->c;
assert(wrap->active == true);
item *read_it = (item *)io->buf;
bool miss = false;
// TODO: How to do counters for hit/misses?
if (ret < 1) {
miss = true;
} else {
uint32_t crc2;
uint32_t crc = (uint32_t) read_it->exptime;
int x;
// item is chunked, crc the iov's
if (io->iov != NULL) {
// first iov is the header, which we don't use beyond crc
crc2 = crc32c(0, (char *)io->iov[0].iov_base+STORE_OFFSET, io->iov[0
].iov_len-STORE_OFFSET);
// make sure it's not sent. hack :(
io->iov[0].iov_len = 0;
for (x = 1; x < io->iovcnt; x++) {
crc2 = crc32c(crc2, (char *)io->iov[x].iov_base, io->iov[x].iov_
len);
}
} else {
crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, io->len-STORE_OFFSET)
;
}
if (crc != crc2) {
miss = true;
wrap->badcrc = true;
}
}
if (miss) {
if (wrap->noreply) {
// In all GET cases, noreply means we send nothing back.
resp->skip = true;
} else {
// TODO: This should be movable to the worker thread.
// Convert the binprot response into a miss response.
// The header requires knowing a bunch of stateful crap, so rather
// than simply writing out a "new" miss response we mangle what's
// already there.
if (c->protocol == binary_prot) {
protocol_binary_response_header *header =
(protocol_binary_response_header *)resp->wbuf;
// cut the extra nbytes off of the body_len
uint32_t body_len = ntohl(header->response.bodylen);
uint8_t hdr_len = header->response.extlen;
body_len -= resp->iov[wrap->iovec_data].iov_len + hdr_len;
resp->tosend -= resp->iov[wrap->iovec_data].iov_len + hdr_len;
header->response.extlen = 0;
header->response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPON
SE_KEY_ENOENT);
header->response.bodylen = htonl(body_len);
// truncate the data response.
resp->iov[wrap->iovec_data].iov_len = 0;
// wipe the extlen iov... wish it was just a flat buffer.
resp->iov[wrap->iovec_data-1].iov_len = 0;
resp->chunked_data_iov = 0;
} else {
int i;
// Meta commands have EN status lines for miss, rather than
// END as a trailer as per normal ascii.
if (resp->iov[0].iov_len >= 3
&& memcmp(resp->iov[0].iov_base, "VA ", 3) == 0) {
// TODO: These miss translators should use specific callback
// functions attached to the io wrap. This is weird :(
resp->iovcnt = 1;
resp->iov[0].iov_len = 4;
resp->iov[0].iov_base = "EN\r\n";
resp->tosend = 4;
} else {
// Wipe the iovecs up through our data injection.
// Allows trailers to be returned (END)
for (i = 0; i <= wrap->iovec_data; i++) {
resp->tosend -= resp->iov[i].iov_len;
resp->iov[i].iov_len = 0;
resp->iov[i].iov_base = NULL;
}
}
resp->chunked_total = 0;
resp->chunked_data_iov = 0;
}
}
wrap->miss = true;
} else {
assert(read_it->slabs_clsid != 0);
// TODO: should always use it instead of ITEM_data to kill more
// chunked special casing.
if ((read_it->it_flags & ITEM_CHUNKED) == 0) {
resp->iov[wrap->iovec_data].iov_base = ITEM_data(read_it);
}
wrap->miss = false;
}
c->io_wrapleft--;
wrap->active = false;
//assert(c->io_wrapleft >= 0);
// All IO's have returned, lets re-attach this connection to our original
// thread.
if (c->io_wrapleft == 0) {
assert(c->io_queued == true);
c->io_queued = false;
redispatch_conn(c);
}
}
int _get_extstore(conn *c, item *it, mc_resp *resp) {
#ifdef NEED_ALIGN
item_hdr hdr;
memcpy(&hdr, ITEM_data(it), sizeof(hdr));
#else
item_hdr *hdr = (item_hdr *)ITEM_data(it);
#endif
size_t ntotal = ITEM_ntotal(it);
unsigned int clsid = slabs_clsid(ntotal);
item *new_it;
bool chunked = false;
if (ntotal > settings.slab_chunk_size_max) {
// Pull a chunked item header.
uint32_t flags;
FLAGS_CONV(it, flags);
new_it = item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, it->nbyt
es);
assert(new_it == NULL || (new_it->it_flags & ITEM_CHUNKED));
chunked = true;
} else {
new_it = do_item_alloc_pull(ntotal, clsid);
}
if (new_it == NULL)
return -1;
assert(!c->io_queued); // FIXME: debugging.
// so we can free the chunk on a miss
new_it->slabs_clsid = clsid;
io_wrap *io = do_cache_alloc(c->thread->io_cache);
io->active = true;
io->miss = false;
io->badcrc = false;
io->noreply = c->noreply;
// io_wrap owns the reference for this object now.
io->hdr_it = it;
io->resp = resp;
io->io.iov = NULL;
// FIXME: error handling.
if (chunked) {
unsigned int ciovcnt = 0;
size_t remain = new_it->nbytes;
item_chunk *chunk = (item_chunk *) ITEM_schunk(new_it);
// TODO: This might make sense as a _global_ cache vs a per-thread.
// but we still can't load objects requiring > IOV_MAX iovs.
// In the meantime, these objects are rare/slow enough that
// malloc/freeing a statically sized object won't cause us much pain.
io->io.iov = malloc(sizeof(struct iovec) * IOV_MAX);
if (io->io.iov == NULL) {
item_remove(new_it);
do_cache_free(c->thread->io_cache, io);
return -1;
}
// fill the header so we can get the full data + crc back.
io->io.iov[0].iov_base = new_it;
io->io.iov[0].iov_len = ITEM_ntotal(new_it) - new_it->nbytes;
ciovcnt++;
while (remain > 0) {
chunk = do_item_alloc_chunk(chunk, remain);
// FIXME: _pure evil_, silently erroring if item is too large.
if (chunk == NULL || ciovcnt > IOV_MAX-1) {
item_remove(new_it);
free(io->io.iov);
// TODO: wrapper function for freeing up an io wrap?
io->io.iov = NULL;
do_cache_free(c->thread->io_cache, io);
return -1;
}
io->io.iov[ciovcnt].iov_base = chunk->data;
io->io.iov[ciovcnt].iov_len = (remain < chunk->size) ? remain : chun
k->size;
chunk->used = (remain < chunk->size) ? remain : chunk->size;
remain -= chunk->size;
ciovcnt++;
}
io->io.iovcnt = ciovcnt;
}
// Chunked or non chunked we reserve a response iov here.
io->iovec_data = resp->iovcnt;
int iovtotal = (c->protocol == binary_prot) ? it->nbytes - 2 : it->nbytes;
if (chunked) {
resp_add_chunked_iov(resp, new_it, iovtotal);
} else {
resp_add_iov(resp, "", iovtotal);
}
io->io.buf = (void *)new_it;
io->c = c;
// We need to stack the sub-struct IO's together as well.
if (c->io_wraplist) {
io->io.next = &c->io_wraplist->io;
} else {
io->io.next = NULL;
}
// IO queue for this connection.
io->next = c->io_wraplist;
c->io_wraplist = io;
assert(c->io_wrapleft >= 0);
c->io_wrapleft++;
// reference ourselves for the callback.
io->io.data = (void *)io;
// Now, fill in io->io based on what was in our header.
#ifdef NEED_ALIGN
io->io.page_version = hdr.page_version;
io->io.page_id = hdr.page_id;
io->io.offset = hdr.offset;
#else
io->io.page_version = hdr->page_version;
io->io.page_id = hdr->page_id;
io->io.offset = hdr->offset;
#endif
io->io.len = ntotal;
io->io.mode = OBJ_IO_READ;
io->io.cb = _get_extstore_cb;
//fprintf(stderr, "EXTSTORE: IO stacked %u\n", io->iovec_data);
// FIXME: This stat needs to move to reflect # of flash hits vs misses
// for now it's a good gauge on how often we request out to flash at
// least.
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
return 0;
}
#endif
/* /*
* adds a delta value to a numeric item. * adds a delta value to a numeric item.
* *
* c connection requesting the operation * c connection requesting the operation
* it item to adjust * it item to adjust
* incr true to increment value, false to decrement * incr true to increment value, false to decrement
* delta amount to adjust value by * delta amount to adjust value by
* buf buffer for response string * buf buffer for response string
* *
* returns a response string to send back to the client. * returns a response string to send back to the client.
skipping to change at line 3010 skipping to change at line 2691
// clear the message and initialize it. // clear the message and initialize it.
memset(&msg, 0, sizeof(struct msghdr)); memset(&msg, 0, sizeof(struct msghdr));
msg.msg_iov = iovs; msg.msg_iov = iovs;
// the UDP source to return to. // the UDP source to return to.
msg.msg_name = &resp->request_addr; msg.msg_name = &resp->request_addr;
msg.msg_namelen = resp->request_addr_size; msg.msg_namelen = resp->request_addr_size;
// First IOV is the custom UDP header. // First IOV is the custom UDP header.
iovs[0].iov_base = udp_hdr; iovs[0].iov_base = (void *)udp_hdr;
iovs[0].iov_len = UDP_HEADER_SIZE; iovs[0].iov_len = UDP_HEADER_SIZE;
build_udp_header(udp_hdr, resp); build_udp_header(udp_hdr, resp);
iovused++; iovused++;
// Fill the IOV's the standard way. // Fill the IOV's the standard way.
// TODO: might get a small speedup if we let it break early with a length // TODO: might get a small speedup if we let it break early with a length
// limit. // limit.
iovused = _transmit_pre(c, iovs, iovused, TRANSMIT_ONE_RESP); iovused = _transmit_pre(c, iovs, iovused, TRANSMIT_ONE_RESP);
// Clip the IOV's to the max UDP packet size. // Clip the IOV's to the max UDP packet size.
skipping to change at line 3220 skipping to change at line 2901
if (!use_accept4) { if (!use_accept4) {
if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) { if (fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL) | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK"); perror("setting O_NONBLOCK");
close(sfd); close(sfd);
break; break;
} }
} }
bool reject; bool reject;
if (settings.maxconns_fast) { if (settings.maxconns_fast) {
STATS_LOCK(); reject = sfd >= settings.maxconns - 1;
reject = stats_state.curr_conns + stats_state.reserved_fds >= se
ttings.maxconns - 1;
if (reject) { if (reject) {
STATS_LOCK();
stats.rejected_conns++; stats.rejected_conns++;
STATS_UNLOCK();
} }
STATS_UNLOCK();
} else { } else {
reject = false; reject = false;
} }
if (reject) { if (reject) {
str = "ERROR Too many open connections\r\n"; str = "ERROR Too many open connections\r\n";
res = write(sfd, str, strlen(str)); res = write(sfd, str, strlen(str));
close(sfd); close(sfd);
} else { } else {
void *ssl_v = NULL; void *ssl_v = NULL;
skipping to change at line 3509 skipping to change at line 3190
break; break;
} }
/* otherwise we have a real error, on which we close the connection */ /* otherwise we have a real error, on which we close the connection */
if (settings.verbose > 0) if (settings.verbose > 0)
fprintf(stderr, "Failed to read, and not due to blocking\n"); fprintf(stderr, "Failed to read, and not due to blocking\n");
conn_set_state(c, conn_closing); conn_set_state(c, conn_closing);
break; break;
case conn_write: case conn_write:
case conn_mwrite: case conn_mwrite:
#ifdef EXTSTORE
/* have side IO's that must process before transmit() can run. /* have side IO's that must process before transmit() can run.
* remove the connection from the worker thread and dispatch the * remove the connection from the worker thread and dispatch the
* IO queue * IO queue
*/ */
if (c->io_wrapleft) { if (c->io_queues[0].type != IO_QUEUE_NONE) {
assert(c->io_queued == false); assert(c->io_queues_submitted == 0);
assert(c->io_wraplist != NULL); bool hit = false;
// TODO: create proper state for this condition
conn_set_state(c, conn_watch); for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++
event_del(&c->event); ) {
c->io_queued = true; if (q->count != 0) {
extstore_submit(c->thread->storage, &c->io_wraplist->io); assert(q->stack_ctx != NULL);
stop = true; hit = true;
break; q->submit_cb(q->ctx, q->stack_ctx);
c->io_queues_submitted++;
}
}
if (hit) {
conn_set_state(c, conn_io_queue);
event_del(&c->event);
stop = true;
break;
}
} }
#endif
switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) { switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) {
case TRANSMIT_COMPLETE: case TRANSMIT_COMPLETE:
if (c->state == conn_mwrite) { if (c->state == conn_mwrite) {
// Free up IO wraps and any half-uploaded items. // Free up IO wraps and any half-uploaded items.
conn_release_items(c); conn_release_items(c);
conn_set_state(c, conn_new_cmd); conn_set_state(c, conn_new_cmd);
if (c->close_after_write) { if (c->close_after_write) {
conn_set_state(c, conn_closing); conn_set_state(c, conn_closing);
} }
} else { } else {
skipping to change at line 3569 skipping to change at line 3258
case conn_closed: case conn_closed:
/* This only happens if dormando is an idiot. */ /* This only happens if dormando is an idiot. */
abort(); abort();
break; break;
case conn_watch: case conn_watch:
/* We handed off our connection to the logger thread. */ /* We handed off our connection to the logger thread. */
stop = true; stop = true;
break; break;
case conn_io_queue:
/* Complete our queued IO's from within the worker thread. */
conn_io_queue_complete(c);
conn_set_state(c, conn_mwrite);
break;
case conn_max_state: case conn_max_state:
assert(false); assert(false);
break; break;
} }
} }
return; return;
} }
void event_handler(const evutil_socket_t fd, const short which, void *arg) { void event_handler(const evutil_socket_t fd, const short which, void *arg) {
skipping to change at line 3708 skipping to change at line 3402
* we make sure at least one works before erroring. * we make sure at least one works before erroring.
*/ */
if (errno == EMFILE) { if (errno == EMFILE) {
/* ...unless we're out of fds */ /* ...unless we're out of fds */
perror("server_socket"); perror("server_socket");
exit(EX_OSERR); exit(EX_OSERR);
} }
continue; continue;
} }
if (settings.num_napi_ids) {
socklen_t len = sizeof(socklen_t);
int napi_id;
error = getsockopt(sfd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &
len);
if (error != 0) {
fprintf(stderr, "-N <num_napi_ids> option not supported\n");
exit(EXIT_FAILURE);
}
}
#ifdef IPV6_V6ONLY #ifdef IPV6_V6ONLY
if (next->ai_family == AF_INET6) { if (next->ai_family == AF_INET6) {
error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags)); error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
if (error != 0) { if (error != 0) {
perror("setsockopt"); perror("setsockopt");
close(sfd); close(sfd);
continue; continue;
} }
} }
#endif #endif
skipping to change at line 4231 skipping to change at line 3935
" or 3(Once)\n"); " or 3(Once)\n");
printf(" - ssl_ciphers: specify cipher list to be used\n" printf(" - ssl_ciphers: specify cipher list to be used\n"
" - ssl_ca_cert: PEM format file of acceptable client CA's\ n" " - ssl_ca_cert: PEM format file of acceptable client CA's\ n"
" - ssl_wbuf_size: size in kilobytes of per-connection SSL ou tput buffer\n" " - ssl_wbuf_size: size in kilobytes of per-connection SSL ou tput buffer\n"
" (default: %u)\n", settings.ssl_wbuf_size / (1 << 10)); " (default: %u)\n", settings.ssl_wbuf_size / (1 << 10));
printf(" - ssl_session_cache: enable server-side SSL session cache, to s upport session\n" printf(" - ssl_session_cache: enable server-side SSL session cache, to s upport session\n"
" resumption\n"); " resumption\n");
verify_default("ssl_keyformat", settings.ssl_keyformat == SSL_FILETYPE_PEM); verify_default("ssl_keyformat", settings.ssl_keyformat == SSL_FILETYPE_PEM);
verify_default("ssl_verify_mode", settings.ssl_verify_mode == SSL_VERIFY_NON E); verify_default("ssl_verify_mode", settings.ssl_verify_mode == SSL_VERIFY_NON E);
#endif #endif
printf("-N, --napi_ids number of napi ids. see doc/napi_ids.txt f or more details\n");
return; return;
} }
static void usage_license(void) { static void usage_license(void) {
printf(PACKAGE " " VERSION "\n\n"); printf(PACKAGE " " VERSION "\n\n");
printf( printf(
"Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n" "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
"All rights reserved.\n" "All rights reserved.\n"
"\n" "\n"
"Redistribution and use in source and binary forms, with or without\n" "Redistribution and use in source and binary forms, with or without\n"
skipping to change at line 4790 skipping to change at line 4495
} }
int main (int argc, char **argv) { int main (int argc, char **argv) {
int c; int c;
bool lock_memory = false; bool lock_memory = false;
bool do_daemonize = false; bool do_daemonize = false;
bool preallocate = false; bool preallocate = false;
int maxcore = 0; int maxcore = 0;
char *username = NULL; char *username = NULL;
char *pid_file = NULL; char *pid_file = NULL;
char *memory_file = NULL;
struct passwd *pw; struct passwd *pw;
struct rlimit rlim; struct rlimit rlim;
char *buf; char *buf;
char unit = '\0'; char unit = '\0';
int size_max = 0; int size_max = 0;
int retval = EXIT_SUCCESS; int retval = EXIT_SUCCESS;
bool protocol_specified = false; bool protocol_specified = false;
bool tcp_specified = false; bool tcp_specified = false;
bool udp_specified = false; bool udp_specified = false;
bool start_lru_maintainer = true; bool start_lru_maintainer = true;
skipping to change at line 4813 skipping to change at line 4517
enum hashfunc_type hash_type = MURMUR3_HASH; enum hashfunc_type hash_type = MURMUR3_HASH;
uint32_t tocrawl; uint32_t tocrawl;
uint32_t slab_sizes[MAX_NUMBER_OF_SLAB_CLASSES]; uint32_t slab_sizes[MAX_NUMBER_OF_SLAB_CLASSES];
bool use_slab_sizes = false; bool use_slab_sizes = false;
char *slab_sizes_unparsed = NULL; char *slab_sizes_unparsed = NULL;
bool slab_chunk_size_changed = false; bool slab_chunk_size_changed = false;
// struct for restart code. Initialized up here so we can curry // struct for restart code. Initialized up here so we can curry
// important settings to save or validate. // important settings to save or validate.
struct _mc_meta_data *meta = malloc(sizeof(struct _mc_meta_data)); struct _mc_meta_data *meta = malloc(sizeof(struct _mc_meta_data));
meta->slab_config = NULL; meta->slab_config = NULL;
#ifdef EXTSTORE
void *storage = NULL;
struct extstore_conf_file *storage_file = NULL;
struct extstore_conf ext_cf;
#endif
char *subopts, *subopts_orig; char *subopts, *subopts_orig;
char *subopts_value; char *subopts_value;
enum { enum {
MAXCONNS_FAST = 0, MAXCONNS_FAST = 0,
HASHPOWER_INIT, HASHPOWER_INIT,
NO_HASHEXPAND, NO_HASHEXPAND,
SLAB_REASSIGN, SLAB_REASSIGN,
SLAB_AUTOMOVE, SLAB_AUTOMOVE,
SLAB_AUTOMOVE_RATIO, SLAB_AUTOMOVE_RATIO,
SLAB_AUTOMOVE_WINDOW, SLAB_AUTOMOVE_WINDOW,
skipping to change at line 4872 skipping to change at line 4571
SSL_VERIFY_MODE, SSL_VERIFY_MODE,
SSL_KEYFORM, SSL_KEYFORM,
SSL_CIPHERS, SSL_CIPHERS,
SSL_CA_CERT, SSL_CA_CERT,
SSL_WBUF_SIZE, SSL_WBUF_SIZE,
SSL_SESSION_CACHE, SSL_SESSION_CACHE,
#endif #endif
#ifdef MEMCACHED_DEBUG #ifdef MEMCACHED_DEBUG
RELAXED_PRIVILEGES, RELAXED_PRIVILEGES,
#endif #endif
#ifdef EXTSTORE
EXT_PAGE_SIZE,
EXT_WBUF_SIZE,
EXT_THREADS,
EXT_IO_DEPTH,
EXT_PATH,
EXT_ITEM_SIZE,
EXT_ITEM_AGE,
EXT_LOW_TTL,
EXT_RECACHE_RATE,
EXT_COMPACT_UNDER,
EXT_DROP_UNDER,
EXT_MAX_FRAG,
EXT_DROP_UNREAD,
SLAB_AUTOMOVE_FREERATIO,
#endif
}; };
char *const subopts_tokens[] = { char *const subopts_tokens[] = {
[MAXCONNS_FAST] = "maxconns_fast", [MAXCONNS_FAST] = "maxconns_fast",
[HASHPOWER_INIT] = "hashpower", [HASHPOWER_INIT] = "hashpower",
[NO_HASHEXPAND] = "no_hashexpand", [NO_HASHEXPAND] = "no_hashexpand",
[SLAB_REASSIGN] = "slab_reassign", [SLAB_REASSIGN] = "slab_reassign",
[SLAB_AUTOMOVE] = "slab_automove", [SLAB_AUTOMOVE] = "slab_automove",
[SLAB_AUTOMOVE_RATIO] = "slab_automove_ratio", [SLAB_AUTOMOVE_RATIO] = "slab_automove_ratio",
[SLAB_AUTOMOVE_WINDOW] = "slab_automove_window", [SLAB_AUTOMOVE_WINDOW] = "slab_automove_window",
[TAIL_REPAIR_TIME] = "tail_repair_time", [TAIL_REPAIR_TIME] = "tail_repair_time",
skipping to change at line 4941 skipping to change at line 4624
[SSL_VERIFY_MODE] = "ssl_verify_mode", [SSL_VERIFY_MODE] = "ssl_verify_mode",
[SSL_KEYFORM] = "ssl_keyformat", [SSL_KEYFORM] = "ssl_keyformat",
[SSL_CIPHERS] = "ssl_ciphers", [SSL_CIPHERS] = "ssl_ciphers",
[SSL_CA_CERT] = "ssl_ca_cert", [SSL_CA_CERT] = "ssl_ca_cert",
[SSL_WBUF_SIZE] = "ssl_wbuf_size", [SSL_WBUF_SIZE] = "ssl_wbuf_size",
[SSL_SESSION_CACHE] = "ssl_session_cache", [SSL_SESSION_CACHE] = "ssl_session_cache",
#endif #endif
#ifdef MEMCACHED_DEBUG #ifdef MEMCACHED_DEBUG
[RELAXED_PRIVILEGES] = "relaxed_privileges", [RELAXED_PRIVILEGES] = "relaxed_privileges",
#endif #endif
#ifdef EXTSTORE
[EXT_PAGE_SIZE] = "ext_page_size",
[EXT_WBUF_SIZE] = "ext_wbuf_size",
[EXT_THREADS] = "ext_threads",
[EXT_IO_DEPTH] = "ext_io_depth",
[EXT_PATH] = "ext_path",
[EXT_ITEM_SIZE] = "ext_item_size",
[EXT_ITEM_AGE] = "ext_item_age",
[EXT_LOW_TTL] = "ext_low_ttl",
[EXT_RECACHE_RATE] = "ext_recache_rate",
[EXT_COMPACT_UNDER] = "ext_compact_under",
[EXT_DROP_UNDER] = "ext_drop_under",
[EXT_MAX_FRAG] = "ext_max_frag",
[EXT_DROP_UNREAD] = "ext_drop_unread",
[SLAB_AUTOMOVE_FREERATIO] = "slab_automove_freeratio",
#endif
NULL NULL
}; };
if (!sanitycheck()) { if (!sanitycheck()) {
free(meta); free(meta);
return EX_OSERR; return EX_OSERR;
} }
/* handle SIGINT, SIGTERM */ /* handle SIGINT, SIGTERM */
signal(SIGINT, sig_handler); signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler); signal(SIGTERM, sig_handler);
signal(SIGHUP, sighup_handler); signal(SIGHUP, sighup_handler);
signal(SIGUSR1, sig_usrhandler); signal(SIGUSR1, sig_usrhandler);
/* init settings */ /* init settings */
settings_init(); settings_init();
verify_default("hash_algorithm", hash_type == MURMUR3_HASH); verify_default("hash_algorithm", hash_type == MURMUR3_HASH);
#ifdef EXTSTORE #ifdef EXTSTORE
settings.ext_item_size = 512; void *storage = NULL;
settings.ext_item_age = UINT_MAX; void *storage_cf = storage_init_config(&settings);
settings.ext_low_ttl = 0; bool storage_enabled = false;
settings.ext_recache_rate = 2000; if (storage_cf == NULL) {
settings.ext_max_frag = 0.8; fprintf(stderr, "failed to allocate extstore config\n");
settings.ext_drop_unread = false; return 1;
settings.ext_wbuf_size = 1024 * 1024 * 4; }
settings.ext_compact_under = 0;
settings.ext_drop_under = 0;
settings.slab_automove_freeratio = 0.01;
settings.ext_page_size = 1024 * 1024 * 64;
settings.ext_io_threadcount = 1;
ext_cf.page_size = settings.ext_page_size;
ext_cf.wbuf_size = settings.ext_wbuf_size;
ext_cf.io_threadcount = settings.ext_io_threadcount;
ext_cf.io_depth = 1;
ext_cf.page_buckets = 4;
ext_cf.wbuf_count = ext_cf.page_buckets;
#endif #endif
/* Run regardless of initializing it later */ /* Run regardless of initializing it later */
init_lru_maintainer(); init_lru_maintainer();
/* set stderr non-buffering (for running under, say, daemontools) */ /* set stderr non-buffering (for running under, say, daemontools) */
setbuf(stderr, NULL); setbuf(stderr, NULL);
char *shortopts = char *shortopts =
"a:" /* access mask for unix socket */ "a:" /* access mask for unix socket */
skipping to change at line 5036 skipping to change at line 4692
"b:" /* backlog queue limit */ "b:" /* backlog queue limit */
"B:" /* Binding protocol */ "B:" /* Binding protocol */
"I:" /* Max item size */ "I:" /* Max item size */
"S" /* Sasl ON */ "S" /* Sasl ON */
"F" /* Disable flush_all */ "F" /* Disable flush_all */
"X" /* Disable dump commands */ "X" /* Disable dump commands */
"W" /* Disable watch commands */ "W" /* Disable watch commands */
"Y:" /* Enable token auth */ "Y:" /* Enable token auth */
"e:" /* mmap path for external item memory */ "e:" /* mmap path for external item memory */
"o:" /* Extended generic options */ "o:" /* Extended generic options */
"N:" /* NAPI ID based thread selection */
; ;
/* process arguments */ /* process arguments */
#ifdef HAVE_GETOPT_LONG #ifdef HAVE_GETOPT_LONG
const struct option longopts[] = { const struct option longopts[] = {
{"unix-mask", required_argument, 0, 'a'}, {"unix-mask", required_argument, 0, 'a'},
{"enable-shutdown", no_argument, 0, 'A'}, {"enable-shutdown", no_argument, 0, 'A'},
{"enable-ssl", no_argument, 0, 'Z'}, {"enable-ssl", no_argument, 0, 'Z'},
{"port", required_argument, 0, 'p'}, {"port", required_argument, 0, 'p'},
{"unix-socket", required_argument, 0, 's'}, {"unix-socket", required_argument, 0, 's'},
skipping to change at line 5076 skipping to change at line 4733
{"listen-backlog", required_argument, 0, 'b'}, {"listen-backlog", required_argument, 0, 'b'},
{"protocol", required_argument, 0, 'B'}, {"protocol", required_argument, 0, 'B'},
{"max-item-size", required_argument, 0, 'I'}, {"max-item-size", required_argument, 0, 'I'},
{"enable-sasl", no_argument, 0, 'S'}, {"enable-sasl", no_argument, 0, 'S'},
{"disable-flush-all", no_argument, 0, 'F'}, {"disable-flush-all", no_argument, 0, 'F'},
{"disable-dumping", no_argument, 0, 'X'}, {"disable-dumping", no_argument, 0, 'X'},
{"disable-watch", no_argument, 0, 'W'}, {"disable-watch", no_argument, 0, 'W'},
{"auth-file", required_argument, 0, 'Y'}, {"auth-file", required_argument, 0, 'Y'},
{"memory-file", required_argument, 0, 'e'}, {"memory-file", required_argument, 0, 'e'},
{"extended", required_argument, 0, 'o'}, {"extended", required_argument, 0, 'o'},
{"napi-ids", required_argument, 0, 'N'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
int optindex; int optindex;
while (-1 != (c = getopt_long(argc, argv, shortopts, while (-1 != (c = getopt_long(argc, argv, shortopts,
longopts, &optindex))) { longopts, &optindex))) {
#else #else
while (-1 != (c = getopt(argc, argv, shortopts))) { while (-1 != (c = getopt(argc, argv, shortopts))) {
#endif #endif
switch (c) { switch (c) {
case 'A': case 'A':
skipping to change at line 5189 skipping to change at line 4847
return 1; return 1;
} }
break; break;
case 'u': case 'u':
username = optarg; username = optarg;
break; break;
case 'P': case 'P':
pid_file = optarg; pid_file = optarg;
break; break;
case 'e': case 'e':
memory_file = optarg; settings.memory_file = optarg;
break; break;
case 'f': case 'f':
settings.factor = atof(optarg); settings.factor = atof(optarg);
if (settings.factor <= 1.0) { if (settings.factor <= 1.0) {
fprintf(stderr, "Factor must be greater than 1\n"); fprintf(stderr, "Factor must be greater than 1\n");
return 1; return 1;
} }
meta->slab_config = strdup(optarg); meta->slab_config = strdup(optarg);
break; break;
case 'n': case 'n':
skipping to change at line 5297 skipping to change at line 4955
case 'X' : case 'X' :
settings.dump_enabled = false; settings.dump_enabled = false;
break; break;
case 'W' : case 'W' :
settings.watch_enabled = false; settings.watch_enabled = false;
break; break;
case 'Y' : case 'Y' :
// dupe the file path now just in case the options get mangled. // dupe the file path now just in case the options get mangled.
settings.auth_file = strdup(optarg); settings.auth_file = strdup(optarg);
break; break;
case 'N':
settings.num_napi_ids = atoi(optarg);
if (settings.num_napi_ids <= 0) {
fprintf(stderr, "Maximum number of NAPI IDs must be greater than
0\n");
return 1;
}
break;
case 'o': /* It's sub-opts time! */ case 'o': /* It's sub-opts time! */
subopts_orig = subopts = strdup(optarg); /* getsubopt() changes the original args */ subopts_orig = subopts = strdup(optarg); /* getsubopt() changes the original args */
while (*subopts != '\0') { while (*subopts != '\0') {
// BSD getsubopt (at least) has undefined behavior on -1, so
// if we want to retry the getsubopt call in submodules we
// need an extra layer of string copies.
char *subopts_temp_o = NULL;
char *subopts_temp = subopts_temp_o = strdup(subopts);
switch (getsubopt(&subopts, subopts_tokens, &subopts_value)) { switch (getsubopt(&subopts, subopts_tokens, &subopts_value)) {
case MAXCONNS_FAST: case MAXCONNS_FAST:
settings.maxconns_fast = true; settings.maxconns_fast = true;
break; break;
case HASHPOWER_INIT: case HASHPOWER_INIT:
if (subopts_value == NULL) { if (subopts_value == NULL) {
fprintf(stderr, "Missing numeric argument for hashpower\n"); fprintf(stderr, "Missing numeric argument for hashpower\n");
return 1; return 1;
} }
skipping to change at line 5622 skipping to change at line 5292
if (!safe_strtoul(subopts_value, &settings.ssl_wbuf_size)) { if (!safe_strtoul(subopts_value, &settings.ssl_wbuf_size)) {
fprintf(stderr, "could not parse argument to ssl_wbuf_size\n "); fprintf(stderr, "could not parse argument to ssl_wbuf_size\n ");
return 1; return 1;
} }
settings.ssl_wbuf_size *= 1024; /* kilobytes */ settings.ssl_wbuf_size *= 1024; /* kilobytes */
break; break;
case SSL_SESSION_CACHE: case SSL_SESSION_CACHE:
settings.ssl_session_cache = true; settings.ssl_session_cache = true;
break; break;
#endif #endif
#ifdef EXTSTORE
case EXT_PAGE_SIZE:
if (storage_file) {
fprintf(stderr, "Must specify ext_page_size before any ext_p
ath arguments\n");
return 1;
}
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_page_size argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &ext_cf.page_size)) {
fprintf(stderr, "could not parse argument to ext_page_size\n
");
return 1;
}
ext_cf.page_size *= 1024 * 1024; /* megabytes */
break;
case EXT_WBUF_SIZE:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_wbuf_size argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &ext_cf.wbuf_size)) {
fprintf(stderr, "could not parse argument to ext_wbuf_size\n
");
return 1;
}
ext_cf.wbuf_size *= 1024 * 1024; /* megabytes */
settings.ext_wbuf_size = ext_cf.wbuf_size;
break;
case EXT_THREADS:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_threads argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &ext_cf.io_threadcount)) {
fprintf(stderr, "could not parse argument to ext_threads\n")
;
return 1;
}
break;
case EXT_IO_DEPTH:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_io_depth argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &ext_cf.io_depth)) {
fprintf(stderr, "could not parse argument to ext_io_depth\n"
);
return 1;
}
break;
case EXT_ITEM_SIZE:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_item_size argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_item_size)) {
fprintf(stderr, "could not parse argument to ext_item_size\n
");
return 1;
}
break;
case EXT_ITEM_AGE:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_item_age argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_item_age)) {
fprintf(stderr, "could not parse argument to ext_item_age\n"
);
return 1;
}
break;
case EXT_LOW_TTL:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_low_ttl argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_low_ttl)) {
fprintf(stderr, "could not parse argument to ext_low_ttl\n")
;
return 1;
}
break;
case EXT_RECACHE_RATE:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_recache_rate argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_recache_rate)) {
fprintf(stderr, "could not parse argument to ext_recache_rat
e\n");
return 1;
}
break;
case EXT_COMPACT_UNDER:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_compact_under argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_compact_under)) {
fprintf(stderr, "could not parse argument to ext_compact_und
er\n");
return 1;
}
break;
case EXT_DROP_UNDER:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_drop_under argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_drop_under)) {
fprintf(stderr, "could not parse argument to ext_drop_under\
n");
return 1;
}
break;
case EXT_MAX_FRAG:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_max_frag argument\n");
return 1;
}
if (!safe_strtod(subopts_value, &settings.ext_max_frag)) {
fprintf(stderr, "could not parse argument to ext_max_frag\n"
);
return 1;
}
break;
case SLAB_AUTOMOVE_FREERATIO:
if (subopts_value == NULL) {
fprintf(stderr, "Missing slab_automove_freeratio argument\n"
);
return 1;
}
if (!safe_strtod(subopts_value, &settings.slab_automove_freerati
o)) {
fprintf(stderr, "could not parse argument to slab_automove_f
reeratio\n");
return 1;
}
break;
case EXT_DROP_UNREAD:
settings.ext_drop_unread = true;
break;
case EXT_PATH:
if (subopts_value) {
struct extstore_conf_file *tmp = storage_conf_parse(subopts_
value, ext_cf.page_size);
if (tmp == NULL) {
fprintf(stderr, "failed to parse ext_path argument\n");
return 1;
}
if (storage_file != NULL) {
tmp->next = storage_file;
}
storage_file = tmp;
} else {
fprintf(stderr, "missing argument to ext_path, ie: ext_path=
/d/file:5G\n");
return 1;
}
break;
#endif
case MODERN: case MODERN:
/* currently no new defaults */ /* currently no new defaults */
break; break;
case NO_MODERN: case NO_MODERN:
if (!slab_chunk_size_changed) { if (!slab_chunk_size_changed) {
settings.slab_chunk_size_max = settings.slab_page_size; settings.slab_chunk_size_max = settings.slab_page_size;
} }
settings.slab_reassign = false; settings.slab_reassign = false;
settings.slab_automove = 0; settings.slab_automove = 0;
settings.maxconns_fast = false; settings.maxconns_fast = false;
skipping to change at line 5812 skipping to change at line 5334
return 1; return 1;
} }
settings.read_buf_mem_limit *= 1024 * 1024; /* megabytes */ settings.read_buf_mem_limit *= 1024 * 1024; /* megabytes */
break; break;
#ifdef MEMCACHED_DEBUG #ifdef MEMCACHED_DEBUG
case RELAXED_PRIVILEGES: case RELAXED_PRIVILEGES:
settings.relaxed_privileges = true; settings.relaxed_privileges = true;
break; break;
#endif #endif
default: default:
printf("Illegal suboption \"%s\"\n", subopts_value); #ifdef EXTSTORE
// TODO: differentiating response code.
if (storage_read_config(storage_cf, &subopts_temp)) {
return 1;
}
#else
printf("Illegal suboption \"%s\"\n", subopts_temp);
return 1; return 1;
#endif
} // switch
if (subopts_temp_o) {
free(subopts_temp_o);
} }
} } // while
free(subopts_orig); free(subopts_orig);
break; break;
default: default:
fprintf(stderr, "Illegal argument \"%c\"\n", c); fprintf(stderr, "Illegal argument \"%c\"\n", c);
return 1; return 1;
} }
} }
if (settings.num_napi_ids > settings.num_threads) {
fprintf(stderr, "Number of napi_ids(%d) cannot be greater than number of
threads(%d)\n",
settings.num_napi_ids, settings.num_threads);
exit(EX_USAGE);
}
if (settings.item_size_max < ITEM_SIZE_MAX_LOWER_LIMIT) { if (settings.item_size_max < ITEM_SIZE_MAX_LOWER_LIMIT) {
fprintf(stderr, "Item max size cannot be less than 1024 bytes.\n"); fprintf(stderr, "Item max size cannot be less than 1024 bytes.\n");
exit(EX_USAGE); exit(EX_USAGE);
} }
if (settings.item_size_max > (settings.maxbytes / 2)) { if (settings.item_size_max > (settings.maxbytes / 2)) {
fprintf(stderr, "Cannot set item size limit higher than 1/2 of memory ma x.\n"); fprintf(stderr, "Cannot set item size limit higher than 1/2 of memory ma x.\n");
exit(EX_USAGE); exit(EX_USAGE);
} }
if (settings.item_size_max > (ITEM_SIZE_MAX_UPPER_LIMIT)) { if (settings.item_size_max > (ITEM_SIZE_MAX_UPPER_LIMIT)) {
fprintf(stderr, "Cannot set item size limit higher than a gigabyte.\n"); fprintf(stderr, "Cannot set item size limit higher than a gigabyte.\n");
skipping to change at line 5862 skipping to change at line 5400
settings.item_size_max, settings.slab_chunk_size_max); settings.item_size_max, settings.slab_chunk_size_max);
exit(EX_USAGE); exit(EX_USAGE);
} }
if (settings.slab_page_size % settings.slab_chunk_size_max != 0) { if (settings.slab_page_size % settings.slab_chunk_size_max != 0) {
fprintf(stderr, "slab_chunk_max (bytes: %d) must divide evenly into %d ( slab_page_size)\n", fprintf(stderr, "slab_chunk_max (bytes: %d) must divide evenly into %d ( slab_page_size)\n",
settings.slab_chunk_size_max, settings.slab_page_size); settings.slab_chunk_size_max, settings.slab_page_size);
exit(EX_USAGE); exit(EX_USAGE);
} }
#ifdef EXTSTORE #ifdef EXTSTORE
if (storage_file) { switch (storage_check_config(storage_cf)) {
if (settings.item_size_max > ext_cf.wbuf_size) { case 0:
fprintf(stderr, "-I (item_size_max: %d) cannot be larger than ext_wb storage_enabled = true;
uf_size: %d\n", break;
settings.item_size_max, ext_cf.wbuf_size); case 1:
exit(EX_USAGE);
}
if (settings.udpport) {
fprintf(stderr, "Cannot use UDP with extstore enabled (-U 0 to disab
le)\n");
exit(EX_USAGE); exit(EX_USAGE);
} break;
} }
#endif #endif
// Reserve this for the new default. If factor size hasn't changed, use // Reserve this for the new default. If factor size hasn't changed, use
// new default. // new default.
/*if (settings.slab_chunk_size_max == 16384 && settings.factor == 1.25) { /*if (settings.slab_chunk_size_max == 16384 && settings.factor == 1.25) {
settings.factor = 1.08; settings.factor = 1.08;
}*/ }*/
if (slab_sizes_unparsed != NULL) { if (slab_sizes_unparsed != NULL) {
// want the unedited string for restart code. // want the unedited string for restart code.
skipping to change at line 6131 skipping to change at line 5665
} }
} }
/* initialize other stuff */ /* initialize other stuff */
stats_init(); stats_init();
logger_init(); logger_init();
conn_init(); conn_init();
bool reuse_mem = false; bool reuse_mem = false;
void *mem_base = NULL; void *mem_base = NULL;
bool prefill = false; bool prefill = false;
if (memory_file != NULL) { if (settings.memory_file != NULL) {
preallocate = true; preallocate = true;
// Easier to manage memory if we prefill the global pool when reusing. // Easier to manage memory if we prefill the global pool when reusing.
prefill = true; prefill = true;
restart_register("main", _mc_meta_load_cb, _mc_meta_save_cb, meta); restart_register("main", _mc_meta_load_cb, _mc_meta_save_cb, meta);
reuse_mem = restart_mmap_open(settings.maxbytes, reuse_mem = restart_mmap_open(settings.maxbytes,
memory_file, settings.memory_file,
&mem_base); &mem_base);
// The "save" callback gets called when we're closing out the mmap, // The "save" callback gets called when we're closing out the mmap,
// but we don't know what the mmap_base is until after we call open. // but we don't know what the mmap_base is until after we call open.
// So we pass the struct above but have to fill it in here so the // So we pass the struct above but have to fill it in here so the
// data's available during the save routine. // data's available during the save routine.
meta->mmap_base = mem_base; meta->mmap_base = mem_base;
// Also, the callbacks for load() run before _open returns, so we // Also, the callbacks for load() run before _open returns, so we
// should have the old base in 'meta' as of here. // should have the old base in 'meta' as of here.
} }
// Initialize the hash table _after_ checking restart metadata. // Initialize the hash table _after_ checking restart metadata.
// We override the hash table start argument with what was live // We override the hash table start argument with what was live
// previously, to avoid filling a huge set of items into a tiny hash // previously, to avoid filling a huge set of items into a tiny hash
// table. // table.
assoc_init(settings.hashpower_init); assoc_init(settings.hashpower_init);
#ifdef EXTSTORE #ifdef EXTSTORE
if (storage_file && reuse_mem) { if (storage_enabled && reuse_mem) {
fprintf(stderr, "[restart] memory restart with extstore not presently su pported.\n"); fprintf(stderr, "[restart] memory restart with extstore not presently su pported.\n");
reuse_mem = false; reuse_mem = false;
} }
#endif #endif
slabs_init(settings.maxbytes, settings.factor, preallocate, slabs_init(settings.maxbytes, settings.factor, preallocate,
use_slab_sizes ? slab_sizes : NULL, mem_base, reuse_mem); use_slab_sizes ? slab_sizes : NULL, mem_base, reuse_mem);
#ifdef EXTSTORE #ifdef EXTSTORE
if (storage_file) { if (storage_enabled) {
enum extstore_res eres; storage = storage_init(storage_cf);
if (settings.ext_compact_under == 0) {
// If changing the default fraction (4), change the help text as wel
l.
settings.ext_compact_under = storage_file->page_count / 4;
/* Only rescues non-COLD items if below this threshold */
settings.ext_drop_under = storage_file->page_count / 4;
}
// FIXME: temporarily removed.
crc32c_init();
/* Init free chunks to zero. */
for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
settings.ext_free_memchunks[x] = 0;
}
storage = extstore_init(storage_file, &ext_cf, &eres);
if (storage == NULL) { if (storage == NULL) {
fprintf(stderr, "Failed to initialize external storage: %s\n",
extstore_err(eres));
if (eres == EXTSTORE_INIT_OPEN_FAIL) {
perror("extstore open");
}
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
ext_storage = storage; ext_storage = storage;
/* page mover algorithm for extstore needs memory prefilled */ /* page mover algorithm for extstore needs memory prefilled */
prefill = true; prefill = true;
} }
#endif #endif
if (settings.drop_privileges) { if (settings.drop_privileges) {
setup_privilege_violations_handler(); setup_privilege_violations_handler();
} }
if (prefill) if (prefill)
slabs_prefill_global(); slabs_prefill_global();
/* In restartable mode and we've decided to issue a fixup on memory */ /* In restartable mode and we've decided to issue a fixup on memory */
if (memory_file != NULL && reuse_mem) { if (settings.memory_file != NULL && reuse_mem) {
mc_ptr_t old_base = meta->old_base; mc_ptr_t old_base = meta->old_base;
assert(old_base == meta->old_base); assert(old_base == meta->old_base);
// should've pulled in process_started from meta file. // should've pulled in process_started from meta file.
process_started = meta->process_started; process_started = meta->process_started;
// TODO: must be a more canonical way of serializing/deserializing // TODO: must be a more canonical way of serializing/deserializing
// pointers? passing through uint64_t should work, and we're not // pointers? passing through uint64_t should work, and we're not
// annotating the pointer with anything, but it's still slightly // annotating the pointer with anything, but it's still slightly
// insane. // insane.
restart_fixup((void *)old_base); restart_fixup((void *)old_base);
skipping to change at line 6386 skipping to change at line 5902
break; break;
case EXIT_NORMALLY: case EXIT_NORMALLY:
// Don't need to print anything to STDERR for a normal shutdown. // Don't need to print anything to STDERR for a normal shutdown.
break; break;
default: default:
fprintf(stderr, "Exiting on error\n"); fprintf(stderr, "Exiting on error\n");
break; break;
} }
stop_threads(); stop_threads();
if (memory_file != NULL && stop_main_loop == GRACE_STOP) { if (settings.memory_file != NULL && stop_main_loop == GRACE_STOP) {
restart_mmap_close(); restart_mmap_close();
} }
/* remove the PID file if we're a daemon */ /* remove the PID file if we're a daemon */
if (do_daemonize) if (do_daemonize)
remove_pidfile(pid_file); remove_pidfile(pid_file);
/* Clean up strdup() call for bind() address */ /* Clean up strdup() call for bind() address */
if (settings.inter) if (settings.inter)
free(settings.inter); free(settings.inter);
 End of changes. 62 change blocks. 
710 lines changed or deleted 188 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)