"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "thread.c" between
memcached-1.6.8.tar.gz and memcached-1.6.9.tar.gz

About: memcached is a high-performance, distributed memory object caching system, generic in nature, but originally intended for use in speeding up dynamic web applications by alleviating database load.

thread.c  (memcached-1.6.8):thread.c  (memcached-1.6.9)
skipping to change at line 441 skipping to change at line 441
if (settings.read_buf_mem_limit) { if (settings.read_buf_mem_limit) {
int limit = settings.read_buf_mem_limit / settings.num_threads; int limit = settings.read_buf_mem_limit / settings.num_threads;
if (limit < READ_BUFFER_SIZE) { if (limit < READ_BUFFER_SIZE) {
limit = 1; limit = 1;
} else { } else {
limit = limit / READ_BUFFER_SIZE; limit = limit / READ_BUFFER_SIZE;
} }
cache_set_limit(me->rbuf_cache, limit); cache_set_limit(me->rbuf_cache, limit);
} }
#ifdef EXTSTORE me->io_cache = cache_create("io", sizeof(io_pending_t), sizeof(char*), NULL,
me->io_cache = cache_create("io", sizeof(io_wrap), sizeof(char*), NULL, NULL NULL);
);
if (me->io_cache == NULL) { if (me->io_cache == NULL) {
fprintf(stderr, "Failed to create IO object cache\n"); fprintf(stderr, "Failed to create IO object cache\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
#endif
#ifdef TLS #ifdef TLS
if (settings.ssl_enabled) { if (settings.ssl_enabled) {
me->ssl_wbuf = (char *)malloc((size_t)settings.ssl_wbuf_size); me->ssl_wbuf = (char *)malloc((size_t)settings.ssl_wbuf_size);
if (me->ssl_wbuf == NULL) { if (me->ssl_wbuf == NULL) {
fprintf(stderr, "Failed to allocate the SSL write buffer\n"); fprintf(stderr, "Failed to allocate the SSL write buffer\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
#endif #endif
} }
skipping to change at line 537 skipping to change at line 535
#ifdef TLS #ifdef TLS
if (item->ssl) { if (item->ssl) {
SSL_shutdown(item->ssl); SSL_shutdown(item->ssl);
SSL_free(item->ssl); SSL_free(item->ssl);
} }
#endif #endif
close(item->sfd); close(item->sfd);
} }
} else { } else {
c->thread = me; c->thread = me;
#ifdef EXTSTORE
if (c->thread->storage) {
conn_io_queue_add(c, IO_QUEUE_EXTSTORE, c->thread->stora
ge,
storage_submit_cb, storage_complete_cb, storage_fina
lize_cb);
}
#endif
conn_io_queue_add(c, IO_QUEUE_NONE, NULL, NULL, NULL, NULL);
#ifdef TLS #ifdef TLS
if (settings.ssl_enabled && c->ssl != NULL) { if (settings.ssl_enabled && c->ssl != NULL) {
assert(c->thread && c->thread->ssl_wbuf); assert(c->thread && c->thread->ssl_wbuf);
c->ssl_wbuf = c->thread->ssl_wbuf; c->ssl_wbuf = c->thread->ssl_wbuf;
} }
#endif #endif
} }
break; break;
} }
cqi_free(item); cqi_free(item);
skipping to change at line 580 skipping to change at line 586
/* asked to stop */ /* asked to stop */
case 's': case 's':
event_base_loopexit(me->base, NULL); event_base_loopexit(me->base, NULL);
break; break;
} }
} }
/* Which thread we assigned a connection to most recently. */ /* Which thread we assigned a connection to most recently. */
static int last_thread = -1; static int last_thread = -1;
/* Last thread we assigned to a connection based on napi_id */
static int last_thread_by_napi_id = -1;
static LIBEVENT_THREAD *select_thread_round_robin(void)
{
int tid = (last_thread + 1) % settings.num_threads;
last_thread = tid;
return threads + tid;
}
static void reset_threads_napi_id(void)
{
LIBEVENT_THREAD *thread;
int i;
for (i = 0; i < settings.num_threads; i++) {
thread = threads + i;
thread->napi_id = 0;
}
last_thread_by_napi_id = -1;
}
/* Select a worker thread based on the NAPI ID of an incoming connection
* request. NAPI ID is a globally unique ID that identifies a NIC RX queue
* on which a flow is received.
*/
static LIBEVENT_THREAD *select_thread_by_napi_id(int sfd)
{
LIBEVENT_THREAD *thread;
int napi_id, err, i;
socklen_t len;
int tid = -1;
len = sizeof(socklen_t);
err = getsockopt(sfd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len);
if ((err == -1) || (napi_id == 0)) {
STATS_LOCK();
stats.round_robin_fallback++;
STATS_UNLOCK();
return select_thread_round_robin();
}
select:
for (i = 0; i < settings.num_threads; i++) {
thread = threads + i;
if (last_thread_by_napi_id < i) {
thread->napi_id = napi_id;
last_thread_by_napi_id = i;
tid = i;
break;
}
if (thread->napi_id == napi_id) {
tid = i;
break;
}
}
if (tid == -1) {
STATS_LOCK();
stats.unexpected_napi_ids++;
STATS_UNLOCK();
reset_threads_napi_id();
goto select;
}
return threads + tid;
}
/* /*
* Dispatches a new connection to another thread. This is only ever called * Dispatches a new connection to another thread. This is only ever called
* from the main thread, either during initialization (for UDP) or because * from the main thread, either during initialization (for UDP) or because
* of an incoming connection. * of an incoming connection.
*/ */
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport, v oid *ssl) { int read_buffer_size, enum network_transport transport, v oid *ssl) {
CQ_ITEM *item = cqi_new(); CQ_ITEM *item = cqi_new();
char buf[1]; char buf[1];
if (item == NULL) { if (item == NULL) {
close(sfd); close(sfd);
/* given that malloc failed this may also fail, but let's try */ /* given that malloc failed this may also fail, but let's try */
fprintf(stderr, "Failed to allocate memory for connection object\n"); fprintf(stderr, "Failed to allocate memory for connection object\n");
return; return;
} }
int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread;
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid; if (!settings.num_napi_ids)
thread = select_thread_round_robin();
else
thread = select_thread_by_napi_id(sfd);
item->sfd = sfd; item->sfd = sfd;
item->init_state = init_state; item->init_state = init_state;
item->event_flags = event_flags; item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size; item->read_buffer_size = read_buffer_size;
item->transport = transport; item->transport = transport;
item->mode = queue_new_conn; item->mode = queue_new_conn;
item->ssl = ssl; item->ssl = ssl;
cq_push(thread->new_conn_queue, item); cq_push(thread->new_conn_queue, item);
 End of changes. 6 change blocks. 
8 lines changed or deleted 88 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)