"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "crawler.c" between
memcached-1.6.8.tar.gz and memcached-1.6.9.tar.gz

About: memcached is a high-performance, distributed memory object caching system, generic in nature, but originally intended for use in speeding up dynamic web applications by alleviating database load.

crawler.c  (memcached-1.6.8):crawler.c  (memcached-1.6.9)
/* Copyright 2016 Netflix. /* Copyright 2016 Netflix.
* *
* Use and distribution licensed under the BSD license. See * Use and distribution licensed under the BSD license. See
* the LICENSE file for full text. * the LICENSE file for full text.
*/ */
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "memcached.h" #include "memcached.h"
#include "storage.h"
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <fcntl.h> #include <fcntl.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <signal.h> #include <signal.h>
#include <string.h> #include <string.h>
skipping to change at line 189 skipping to change at line 190
* main thread's values too much. Should rethink again. * main thread's values too much. Should rethink again.
*/ */
static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv , int i) { static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv , int i) {
struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data; struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
pthread_mutex_lock(&d->lock); pthread_mutex_lock(&d->lock);
crawlerstats_t *s = &d->crawlerstats[i]; crawlerstats_t *s = &d->crawlerstats[i];
int is_flushed = item_is_flushed(search); int is_flushed = item_is_flushed(search);
#ifdef EXTSTORE #ifdef EXTSTORE
bool is_valid = true; bool is_valid = true;
if (search->it_flags & ITEM_HDR) { if (search->it_flags & ITEM_HDR) {
item_hdr *hdr = (item_hdr *)ITEM_data(search); is_valid = storage_validate_item(storage, search);
if (extstore_check(storage, hdr->page_id, hdr->page_version) != 0)
is_valid = false;
} }
#endif #endif
if ((search->exptime != 0 && search->exptime < current_time) if ((search->exptime != 0 && search->exptime < current_time)
|| is_flushed || is_flushed
#ifdef EXTSTORE #ifdef EXTSTORE
|| !is_valid || !is_valid
#endif #endif
) { ) {
crawlers[i].reclaimed++; crawlers[i].reclaimed++;
s->reclaimed++; s->reclaimed++;
skipping to change at line 353 skipping to change at line 352
crawlers[i].it_flags = 0; crawlers[i].it_flags = 0;
crawler_count--; crawler_count--;
do_item_unlinktail_q((item *)&crawlers[i]); do_item_unlinktail_q((item *)&crawlers[i]);
do_item_stats_add_crawl(i, crawlers[i].reclaimed, do_item_stats_add_crawl(i, crawlers[i].reclaimed,
crawlers[i].unfetched, crawlers[i].checked); crawlers[i].unfetched, crawlers[i].checked);
pthread_mutex_unlock(&lru_locks[i]); pthread_mutex_unlock(&lru_locks[i]);
if (active_crawler_mod.mod->doneclass != NULL) if (active_crawler_mod.mod->doneclass != NULL)
active_crawler_mod.mod->doneclass(&active_crawler_mod, i); active_crawler_mod.mod->doneclass(&active_crawler_mod, i);
} }
static void item_crawl_hash(void) {
// get iterator from assoc. can hang for a long time.
// - blocks hash expansion
void *iter = assoc_get_iterator();
int crawls_persleep = settings.crawls_persleep;
item *it = NULL;
// loop while iterator returns something
// - iterator func handles bucket-walking
// - iterator returns with bucket locked.
while (assoc_iterate(iter, &it)) {
// if iterator returns true but no item, we're inbetween buckets and
// can do sleep or cleanup work without holding a lock.
if (it == NULL) {
// - sleep bits from orig loop
if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
pthread_mutex_unlock(&lru_crawler_lock);
usleep(settings.lru_crawler_sleep);
pthread_mutex_lock(&lru_crawler_lock);
crawls_persleep = settings.crawls_persleep;
} else if (!settings.lru_crawler_sleep) {
// TODO: only cycle lock every N?
pthread_mutex_unlock(&lru_crawler_lock);
pthread_mutex_lock(&lru_crawler_lock);
}
continue;
}
/* Get memory from bipbuf, if client has no space, flush. */
if (active_crawler_mod.c.c != NULL) {
int ret = lru_crawler_client_getbuf(&active_crawler_mod.c);
if (ret != 0) {
// fail out and finalize.
break;
}
} else if (active_crawler_mod.mod->needs_client) {
// fail out and finalize.
break;
}
// double check that the item isn't in a transitional state.
if (refcount_incr(it) < 2) {
refcount_decr(it);
continue;
}
// FIXME: missing hv and i are fine for metadump eval, but not fine
// for expire eval.
active_crawler_mod.mod->eval(&active_crawler_mod, it, 0, 0);
}
// must finalize or we leave the hash table expansion blocked.
assoc_iterate_final(iter);
return;
}
static void *item_crawler_thread(void *arg) { static void *item_crawler_thread(void *arg) {
int i; int i;
int crawls_persleep = settings.crawls_persleep; int crawls_persleep = settings.crawls_persleep;
pthread_mutex_lock(&lru_crawler_lock); pthread_mutex_lock(&lru_crawler_lock);
pthread_cond_signal(&lru_crawler_cond); pthread_cond_signal(&lru_crawler_cond);
settings.lru_crawler = true; settings.lru_crawler = true;
if (settings.verbose > 2) if (settings.verbose > 2)
fprintf(stderr, "Starting LRU crawler background thread\n"); fprintf(stderr, "Starting LRU crawler background thread\n");
while (do_run_lru_crawler_thread) { while (do_run_lru_crawler_thread) {
pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock); pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
if (crawler_count == -1) {
item_crawl_hash();
crawler_count = 0;
} else {
while (crawler_count) { while (crawler_count) {
item *search = NULL; item *search = NULL;
void *hold_lock = NULL; void *hold_lock = NULL;
for (i = POWER_SMALLEST; i < LARGEST_ID; i++) { for (i = POWER_SMALLEST; i < LARGEST_ID; i++) {
if (crawlers[i].it_flags != 1) { if (crawlers[i].it_flags != 1) {
continue; continue;
} }
/* Get memory from bipbuf, if client has no space, flush. */ /* Get memory from bipbuf, if client has no space, flush. */
skipping to change at line 438 skipping to change at line 497
pthread_mutex_unlock(&lru_crawler_lock); pthread_mutex_unlock(&lru_crawler_lock);
usleep(settings.lru_crawler_sleep); usleep(settings.lru_crawler_sleep);
pthread_mutex_lock(&lru_crawler_lock); pthread_mutex_lock(&lru_crawler_lock);
crawls_persleep = settings.crawls_persleep; crawls_persleep = settings.crawls_persleep;
} else if (!settings.lru_crawler_sleep) { } else if (!settings.lru_crawler_sleep) {
// TODO: only cycle lock every N? // TODO: only cycle lock every N?
pthread_mutex_unlock(&lru_crawler_lock); pthread_mutex_unlock(&lru_crawler_lock);
pthread_mutex_lock(&lru_crawler_lock); pthread_mutex_lock(&lru_crawler_lock);
} }
} }
} } // while
} // if crawler_count
if (active_crawler_mod.mod != NULL) { if (active_crawler_mod.mod != NULL) {
if (active_crawler_mod.mod->finalize != NULL) if (active_crawler_mod.mod->finalize != NULL)
active_crawler_mod.mod->finalize(&active_crawler_mod); active_crawler_mod.mod->finalize(&active_crawler_mod);
while (active_crawler_mod.c.c != NULL && bipbuf_used(active_crawler_mod. c.buf)) { while (active_crawler_mod.c.c != NULL && bipbuf_used(active_crawler_mod. c.buf)) {
lru_crawler_poll(&active_crawler_mod.c); lru_crawler_poll(&active_crawler_mod.c);
} }
// Double checking in case the client closed during the poll // Double checking in case the client closed during the poll
if (active_crawler_mod.c.c != NULL) { if (active_crawler_mod.c.c != NULL) {
lru_crawler_release_client(&active_crawler_mod.c); lru_crawler_release_client(&active_crawler_mod.c);
skipping to change at line 557 skipping to change at line 617
crawlers[sid].remaining = remaining; crawlers[sid].remaining = remaining;
crawlers[sid].slabs_clsid = sid; crawlers[sid].slabs_clsid = sid;
crawlers[sid].reclaimed = 0; crawlers[sid].reclaimed = 0;
crawlers[sid].unfetched = 0; crawlers[sid].unfetched = 0;
crawlers[sid].checked = 0; crawlers[sid].checked = 0;
do_item_linktail_q((item *)&crawlers[sid]); do_item_linktail_q((item *)&crawlers[sid]);
crawler_count++; crawler_count++;
starts++; starts++;
} }
pthread_mutex_unlock(&lru_locks[sid]); pthread_mutex_unlock(&lru_locks[sid]);
if (starts) {
STATS_LOCK();
stats_state.lru_crawler_running = true;
stats.lru_crawler_starts++;
STATS_UNLOCK();
}
return starts; return starts;
} }
static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) { static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) {
crawler_client_t *crawlc = &cm->c; crawler_client_t *crawlc = &cm->c;
if (crawlc->c != NULL) { if (crawlc->c != NULL) {
return -1; return -1;
} }
crawlc->c = c; crawlc->c = c;
crawlc->sfd = sfd; crawlc->sfd = sfd;
skipping to change at line 608 skipping to change at line 662
pthread_mutex_unlock(&lru_crawler_lock); pthread_mutex_unlock(&lru_crawler_lock);
block_ae_until = current_time + 60; block_ae_until = current_time + 60;
return -1; return -1;
} }
if (type == CRAWLER_AUTOEXPIRE && block_ae_until > current_time) { if (type == CRAWLER_AUTOEXPIRE && block_ae_until > current_time) {
pthread_mutex_unlock(&lru_crawler_lock); pthread_mutex_unlock(&lru_crawler_lock);
return -1; return -1;
} }
/* hash table walk only supported with metadump for now. */
if (type != CRAWLER_METADUMP && ids == NULL) {
pthread_mutex_unlock(&lru_crawler_lock);
return -2;
}
/* Configure the module */ /* Configure the module */
if (!is_running) { if (!is_running) {
assert(crawler_mod_regs[type] != NULL); assert(crawler_mod_regs[type] != NULL);
active_crawler_mod.mod = crawler_mod_regs[type]; active_crawler_mod.mod = crawler_mod_regs[type];
active_crawler_type = type; active_crawler_type = type;
if (active_crawler_mod.mod->init != NULL) { if (active_crawler_mod.mod->init != NULL) {
active_crawler_mod.mod->init(&active_crawler_mod, data); active_crawler_mod.mod->init(&active_crawler_mod, data);
} }
if (active_crawler_mod.mod->needs_client) { if (active_crawler_mod.mod->needs_client) {
if (c == NULL || sfd == 0) { if (c == NULL || sfd == 0) {
pthread_mutex_unlock(&lru_crawler_lock); pthread_mutex_unlock(&lru_crawler_lock);
return -2; return -2;
} }
if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) { if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) {
pthread_mutex_unlock(&lru_crawler_lock); pthread_mutex_unlock(&lru_crawler_lock);
return -2; return -2;
} }
} }
} }
/* we allow the autocrawler to restart sub-LRU's before completion */ if (ids == NULL) {
for (int sid = POWER_SMALLEST; sid < POWER_LARGEST; sid++) { /* NULL ids means to walk the hash table instead. */
if (ids[sid]) starts = 1;
starts += do_lru_crawler_start(sid, remaining); /* FIXME: hack to signal hash mode to the crawler thread.
* Something more clear would be nice.
*/
crawler_count = -1;
} else {
/* we allow the autocrawler to restart sub-LRU's before completion */
for (int sid = POWER_SMALLEST; sid < POWER_LARGEST; sid++) {
if (ids[sid])
starts += do_lru_crawler_start(sid, remaining);
}
} }
if (starts) { if (starts) {
STATS_LOCK();
stats_state.lru_crawler_running = true;
stats.lru_crawler_starts++;
STATS_UNLOCK();
pthread_cond_signal(&lru_crawler_cond); pthread_cond_signal(&lru_crawler_cond);
} }
pthread_mutex_unlock(&lru_crawler_lock); pthread_mutex_unlock(&lru_crawler_lock);
return starts; return starts;
} }
/* /*
* Also only clear the crawlerstats once per sid. * Also only clear the crawlerstats once per sid.
*/ */
enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_t ype type, enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_t ype type,
void *c, const int sfd, unsigned int remaining) { void *c, const int sfd, unsigned int remaining) {
char *b = NULL; char *b = NULL;
uint32_t sid = 0; uint32_t sid = 0;
int starts = 0; int starts = 0;
uint8_t tocrawl[POWER_LARGEST]; uint8_t tocrawl[POWER_LARGEST];
bool hash_crawl = false;
/* FIXME: I added this while debugging. Don't think it's needed? */ /* FIXME: I added this while debugging. Don't think it's needed? */
memset(tocrawl, 0, sizeof(uint8_t) * POWER_LARGEST); memset(tocrawl, 0, sizeof(uint8_t) * POWER_LARGEST);
if (strcmp(slabs, "all") == 0) { if (strcmp(slabs, "all") == 0) {
for (sid = 0; sid < POWER_LARGEST; sid++) { for (sid = 0; sid < POWER_LARGEST; sid++) {
tocrawl[sid] = 1; tocrawl[sid] = 1;
} }
} else if (strcmp(slabs, "hash") == 0) {
hash_crawl = true;
} else { } else {
for (char *p = strtok_r(slabs, ",", &b); for (char *p = strtok_r(slabs, ",", &b);
p != NULL; p != NULL;
p = strtok_r(NULL, ",", &b)) { p = strtok_r(NULL, ",", &b)) {
if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
|| sid >= MAX_NUMBER_OF_SLAB_CLASSES) { || sid >= MAX_NUMBER_OF_SLAB_CLASSES) {
pthread_mutex_unlock(&lru_crawler_lock);
return CRAWLER_BADCLASS; return CRAWLER_BADCLASS;
} }
tocrawl[sid | TEMP_LRU] = 1; tocrawl[sid | TEMP_LRU] = 1;
tocrawl[sid | HOT_LRU] = 1; tocrawl[sid | HOT_LRU] = 1;
tocrawl[sid | WARM_LRU] = 1; tocrawl[sid | WARM_LRU] = 1;
tocrawl[sid | COLD_LRU] = 1; tocrawl[sid | COLD_LRU] = 1;
} }
} }
starts = lru_crawler_start(tocrawl, remaining, type, NULL, c, sfd); starts = lru_crawler_start(hash_crawl ? NULL : tocrawl, remaining, type, NUL L, c, sfd);
if (starts == -1) { if (starts == -1) {
return CRAWLER_RUNNING; return CRAWLER_RUNNING;
} else if (starts == -2) { } else if (starts == -2) {
return CRAWLER_ERROR; /* FIXME: not very helpful. */ return CRAWLER_ERROR; /* FIXME: not very helpful. */
} else if (starts) { } else if (starts) {
return CRAWLER_OK; return CRAWLER_OK;
} else { } else {
return CRAWLER_NOTSTARTED; return CRAWLER_NOTSTARTED;
} }
} }
 End of changes. 13 change blocks. 
16 lines changed or deleted 91 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)