"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.9/crawler.c" (21 Nov 2020, 25686 Bytes) of package /linux/www/memcached-1.6.9.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "crawler.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.6.8_vs_1.6.9.

    1 /*  Copyright 2016 Netflix.
    2  *
    3  *  Use and distribution licensed under the BSD license.  See
    4  *  the LICENSE file for full text.
    5  */
    6 
    7 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    8 #include "memcached.h"
    9 #include "storage.h"
   10 #include <sys/stat.h>
   11 #include <sys/socket.h>
   12 #include <sys/resource.h>
   13 #include <fcntl.h>
   14 #include <netinet/in.h>
   15 #include <errno.h>
   16 #include <stdlib.h>
   17 #include <stdio.h>
   18 #include <signal.h>
   19 #include <string.h>
   20 #include <time.h>
   21 #include <assert.h>
   22 #include <unistd.h>
   23 #include <poll.h>
   24 
   25 #define LARGEST_ID POWER_LARGEST
   26 
   27 typedef struct {
   28     void *c; /* original connection structure. still with source thread attached. */
   29     int sfd; /* client fd. */
   30     bipbuf_t *buf; /* output buffer */
   31     char *cbuf; /* current buffer */
   32 } crawler_client_t;
   33 
   34 typedef struct _crawler_module_t crawler_module_t;
   35 
   36 typedef void (*crawler_eval_func)(crawler_module_t *cm, item *it, uint32_t hv, int slab_cls);
   37 typedef int (*crawler_init_func)(crawler_module_t *cm, void *data); // TODO: init args?
   38 typedef void (*crawler_deinit_func)(crawler_module_t *cm); // TODO: extra args?
   39 typedef void (*crawler_doneclass_func)(crawler_module_t *cm, int slab_cls);
   40 typedef void (*crawler_finalize_func)(crawler_module_t *cm);
   41 
   42 typedef struct {
   43     crawler_init_func init; /* run before crawl starts */
   44     crawler_eval_func eval; /* runs on an item. */
   45     crawler_doneclass_func doneclass; /* runs once per sub-crawler completion. */
   46     crawler_finalize_func finalize; /* runs once when all sub-crawlers are done. */
   47     bool needs_lock; /* whether or not we need the LRU lock held when eval is called */
   48     bool needs_client; /* whether or not to grab onto the remote client */
   49 } crawler_module_reg_t;
   50 
   51 struct _crawler_module_t {
   52     void *data; /* opaque data pointer */
   53     crawler_client_t c;
   54     crawler_module_reg_t *mod;
   55 };
   56 
   57 static int crawler_expired_init(crawler_module_t *cm, void *data);
   58 static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls);
   59 static void crawler_expired_finalize(crawler_module_t *cm);
   60 static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
   61 
   62 crawler_module_reg_t crawler_expired_mod = {
   63     .init = crawler_expired_init,
   64     .eval = crawler_expired_eval,
   65     .doneclass = crawler_expired_doneclass,
   66     .finalize = crawler_expired_finalize,
   67     .needs_lock = true,
   68     .needs_client = false
   69 };
   70 
   71 static void crawler_metadump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
   72 static void crawler_metadump_finalize(crawler_module_t *cm);
   73 
   74 crawler_module_reg_t crawler_metadump_mod = {
   75     .init = NULL,
   76     .eval = crawler_metadump_eval,
   77     .doneclass = NULL,
   78     .finalize = crawler_metadump_finalize,
   79     .needs_lock = false,
   80     .needs_client = true
   81 };
   82 
   83 crawler_module_reg_t *crawler_mod_regs[3] = {
   84     &crawler_expired_mod,
   85     &crawler_expired_mod,
   86     &crawler_metadump_mod
   87 };
   88 
   89 static int lru_crawler_client_getbuf(crawler_client_t *c);
   90 crawler_module_t active_crawler_mod;
   91 enum crawler_run_type active_crawler_type;
   92 
   93 static crawler crawlers[LARGEST_ID];
   94 
   95 static int crawler_count = 0;
   96 static volatile int do_run_lru_crawler_thread = 0;
   97 static int lru_crawler_initialized = 0;
   98 static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
   99 static pthread_cond_t  lru_crawler_cond = PTHREAD_COND_INITIALIZER;
  100 #ifdef EXTSTORE
  101 /* TODO: pass this around */
  102 static void *storage;
  103 #endif
  104 
  105 /* Will crawl all slab classes a minimum of once per hour */
  106 #define MAX_MAINTCRAWL_WAIT 60 * 60
  107 
  108 /*** LRU CRAWLER THREAD ***/
  109 
  110 #define LRU_CRAWLER_WRITEBUF 8192
  111 
  112 static void lru_crawler_close_client(crawler_client_t *c) {
  113     //fprintf(stderr, "CRAWLER: Closing client\n");
  114     sidethread_conn_close(c->c);
  115     c->c = NULL;
  116     c->cbuf = NULL;
  117     bipbuf_free(c->buf);
  118     c->buf = NULL;
  119 }
  120 
  121 static void lru_crawler_release_client(crawler_client_t *c) {
  122     //fprintf(stderr, "CRAWLER: Closing client\n");
  123     redispatch_conn(c->c);
  124     c->c = NULL;
  125     c->cbuf = NULL;
  126     bipbuf_free(c->buf);
  127     c->buf = NULL;
  128 }
  129 
  130 static int crawler_expired_init(crawler_module_t *cm, void *data) {
  131     struct crawler_expired_data *d;
  132     if (data != NULL) {
  133         d = data;
  134         d->is_external = true;
  135         cm->data = data;
  136     } else {
  137         // allocate data.
  138         d = calloc(1, sizeof(struct crawler_expired_data));
  139         if (d == NULL) {
  140             return -1;
  141         }
  142         // init lock.
  143         pthread_mutex_init(&d->lock, NULL);
  144         d->is_external = false;
  145         d->start_time = current_time;
  146 
  147         cm->data = d;
  148     }
  149     pthread_mutex_lock(&d->lock);
  150     memset(&d->crawlerstats, 0, sizeof(crawlerstats_t) * POWER_LARGEST);
  151     for (int x = 0; x < POWER_LARGEST; x++) {
  152         d->crawlerstats[x].start_time = current_time;
  153         d->crawlerstats[x].run_complete = false;
  154     }
  155     pthread_mutex_unlock(&d->lock);
  156     return 0;
  157 }
  158 
  159 static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls) {
  160     struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
  161     pthread_mutex_lock(&d->lock);
  162     d->crawlerstats[slab_cls].end_time = current_time;
  163     d->crawlerstats[slab_cls].run_complete = true;
  164     pthread_mutex_unlock(&d->lock);
  165 }
  166 
  167 static void crawler_expired_finalize(crawler_module_t *cm) {
  168     struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
  169     pthread_mutex_lock(&d->lock);
  170     d->end_time = current_time;
  171     d->crawl_complete = true;
  172     pthread_mutex_unlock(&d->lock);
  173 
  174     if (!d->is_external) {
  175         free(d);
  176     }
  177 }
  178 
  179 /* I pulled this out to make the main thread clearer, but it reaches into the
  180  * main thread's values too much. Should rethink again.
  181  */
  182 static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i) {
  183     struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
  184     pthread_mutex_lock(&d->lock);
  185     crawlerstats_t *s = &d->crawlerstats[i];
  186     int is_flushed = item_is_flushed(search);
  187 #ifdef EXTSTORE
  188     bool is_valid = true;
  189     if (search->it_flags & ITEM_HDR) {
  190         is_valid = storage_validate_item(storage, search);
  191     }
  192 #endif
  193     if ((search->exptime != 0 && search->exptime < current_time)
  194         || is_flushed
  195 #ifdef EXTSTORE
  196         || !is_valid
  197 #endif
  198         ) {
  199         crawlers[i].reclaimed++;
  200         s->reclaimed++;
  201 
  202         if (settings.verbose > 1) {
  203             int ii;
  204             char *key = ITEM_key(search);
  205             fprintf(stderr, "LRU crawler found an expired item (flags: %d, slab: %d): ",
  206                 search->it_flags, search->slabs_clsid);
  207             for (ii = 0; ii < search->nkey; ++ii) {
  208                 fprintf(stderr, "%c", key[ii]);
  209             }
  210             fprintf(stderr, "\n");
  211         }
  212         if ((search->it_flags & ITEM_FETCHED) == 0 && !is_flushed) {
  213             crawlers[i].unfetched++;
  214         }
  215 #ifdef EXTSTORE
  216         STORAGE_delete(storage, search);
  217 #endif
  218         do_item_unlink_nolock(search, hv);
  219         do_item_remove(search);
  220     } else {
  221         s->seen++;
  222         refcount_decr(search);
  223         if (search->exptime == 0) {
  224             s->noexp++;
  225         } else if (search->exptime - current_time > 3599) {
  226             s->ttl_hourplus++;
  227         } else {
  228             rel_time_t ttl_remain = search->exptime - current_time;
  229             int bucket = ttl_remain / 60;
  230             if (bucket <= 60) {
  231                 s->histo[bucket]++;
  232             }
  233         }
  234     }
  235     pthread_mutex_unlock(&d->lock);
  236 }
  237 
  238 static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
  239     //int slab_id = CLEAR_LRU(i);
  240     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
  241     int is_flushed = item_is_flushed(it);
  242     /* Ignore expired content. */
  243     if ((it->exptime != 0 && it->exptime < current_time)
  244         || is_flushed) {
  245         refcount_decr(it);
  246         return;
  247     }
  248     // TODO: uriencode directly into the buffer.
  249     uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_URI_ENCODED_LENGTH);
  250     int total = snprintf(cm->c.cbuf, 4096,
  251             "key=%s exp=%ld la=%llu cas=%llu fetch=%s cls=%u size=%lu\n",
  252             keybuf,
  253             (it->exptime == 0) ? -1 : (long)(it->exptime + process_started),
  254             (unsigned long long)(it->time + process_started),
  255             (unsigned long long)ITEM_get_cas(it),
  256             (it->it_flags & ITEM_FETCHED) ? "yes" : "no",
  257             ITEM_clsid(it),
  258             (unsigned long) ITEM_ntotal(it));
  259     refcount_decr(it);
  260     // TODO: some way of tracking the errors. these are very unlikely though.
  261     if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0) {
  262         /* Failed to write, don't push it. */
  263         return;
  264     }
  265     bipbuf_push(cm->c.buf, total);
  266 }
  267 
  268 static void crawler_metadump_finalize(crawler_module_t *cm) {
  269     if (cm->c.c != NULL) {
  270         // Ensure space for final message.
  271         lru_crawler_client_getbuf(&cm->c);
  272         memcpy(cm->c.cbuf, "END\r\n", 5);
  273         bipbuf_push(cm->c.buf, 5);
  274     }
  275 }
  276 
  277 static int lru_crawler_poll(crawler_client_t *c) {
  278     unsigned char *data;
  279     unsigned int data_size = 0;
  280     struct pollfd to_poll[1];
  281     to_poll[0].fd = c->sfd;
  282     to_poll[0].events = POLLOUT;
  283 
  284     int ret = poll(to_poll, 1, 1000);
  285 
  286     if (ret < 0) {
  287         // fatal.
  288         return -1;
  289     }
  290 
  291     if (ret == 0) return 0;
  292 
  293     if (to_poll[0].revents & POLLIN) {
  294         char buf[1];
  295         int res = ((conn*)c->c)->read(c->c, buf, 1);
  296         if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
  297             lru_crawler_close_client(c);
  298             return -1;
  299         }
  300     }
  301     if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) {
  302         if (to_poll[0].revents & (POLLHUP|POLLERR)) {
  303             lru_crawler_close_client(c);
  304             return -1;
  305         } else if (to_poll[0].revents & POLLOUT) {
  306             int total = ((conn*)c->c)->write(c->c, data, data_size);
  307             if (total == -1) {
  308                 if (errno != EAGAIN && errno != EWOULDBLOCK) {
  309                     lru_crawler_close_client(c);
  310                     return -1;
  311                 }
  312             } else if (total == 0) {
  313                 lru_crawler_close_client(c);
  314                 return -1;
  315             } else {
  316                 bipbuf_poll(c->buf, total);
  317             }
  318         }
  319     }
  320     return 0;
  321 }
  322 
  323 /* Grab some space to work with, if none exists, run the poll() loop and wait
  324  * for it to clear up or close.
  325  * Return NULL if closed.
  326  */
  327 static int lru_crawler_client_getbuf(crawler_client_t *c) {
  328     void *buf = NULL;
  329     if (c->c == NULL) return -1;
  330     /* not enough space. */
  331     while ((buf = bipbuf_request(c->buf, LRU_CRAWLER_WRITEBUF)) == NULL) {
  332         // TODO: max loops before closing.
  333         int ret = lru_crawler_poll(c);
  334         if (ret < 0) return ret;
  335     }
  336 
  337     c->cbuf = buf;
  338     return 0;
  339 }
  340 
  341 static void lru_crawler_class_done(int i) {
  342     crawlers[i].it_flags = 0;
  343     crawler_count--;
  344     do_item_unlinktail_q((item *)&crawlers[i]);
  345     do_item_stats_add_crawl(i, crawlers[i].reclaimed,
  346             crawlers[i].unfetched, crawlers[i].checked);
  347     pthread_mutex_unlock(&lru_locks[i]);
  348     if (active_crawler_mod.mod->doneclass != NULL)
  349         active_crawler_mod.mod->doneclass(&active_crawler_mod, i);
  350 }
  351 
  352 static void item_crawl_hash(void) {
  353     // get iterator from assoc. can hang for a long time.
  354     // - blocks hash expansion
  355     void *iter = assoc_get_iterator();
  356     int crawls_persleep = settings.crawls_persleep;
  357     item *it = NULL;
  358 
  359     // loop while iterator returns something
  360     // - iterator func handles bucket-walking
  361     // - iterator returns with bucket locked.
  362     while (assoc_iterate(iter, &it)) {
  363         // if iterator returns true but no item, we're inbetween buckets and
  364         // can do sleep or cleanup work without holding a lock.
  365         if (it == NULL) {
  366             // - sleep bits from orig loop
  367             if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
  368                 pthread_mutex_unlock(&lru_crawler_lock);
  369                 usleep(settings.lru_crawler_sleep);
  370                 pthread_mutex_lock(&lru_crawler_lock);
  371                 crawls_persleep = settings.crawls_persleep;
  372             } else if (!settings.lru_crawler_sleep) {
  373                 // TODO: only cycle lock every N?
  374                 pthread_mutex_unlock(&lru_crawler_lock);
  375                 pthread_mutex_lock(&lru_crawler_lock);
  376             }
  377             continue;
  378         }
  379 
  380         /* Get memory from bipbuf, if client has no space, flush. */
  381         if (active_crawler_mod.c.c != NULL) {
  382             int ret = lru_crawler_client_getbuf(&active_crawler_mod.c);
  383             if (ret != 0) {
  384                 // fail out and finalize.
  385                 break;
  386             }
  387         } else if (active_crawler_mod.mod->needs_client) {
  388             // fail out and finalize.
  389             break;
  390         }
  391 
  392         // double check that the item isn't in a transitional state.
  393         if (refcount_incr(it) < 2) {
  394             refcount_decr(it);
  395             continue;
  396         }
  397 
  398         // FIXME: missing hv and i are fine for metadump eval, but not fine
  399         // for expire eval.
  400         active_crawler_mod.mod->eval(&active_crawler_mod, it, 0, 0);
  401     }
  402 
  403     // must finalize or we leave the hash table expansion blocked.
  404     assoc_iterate_final(iter);
  405     return;
  406 }
  407 
  408 static void *item_crawler_thread(void *arg) {
  409     int i;
  410     int crawls_persleep = settings.crawls_persleep;
  411 
  412     pthread_mutex_lock(&lru_crawler_lock);
  413     pthread_cond_signal(&lru_crawler_cond);
  414     settings.lru_crawler = true;
  415     if (settings.verbose > 2)
  416         fprintf(stderr, "Starting LRU crawler background thread\n");
  417     while (do_run_lru_crawler_thread) {
  418     pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
  419 
  420     if (crawler_count == -1) {
  421         item_crawl_hash();
  422         crawler_count = 0;
  423     } else {
  424     while (crawler_count) {
  425         item *search = NULL;
  426         void *hold_lock = NULL;
  427 
  428         for (i = POWER_SMALLEST; i < LARGEST_ID; i++) {
  429             if (crawlers[i].it_flags != 1) {
  430                 continue;
  431             }
  432 
  433             /* Get memory from bipbuf, if client has no space, flush. */
  434             if (active_crawler_mod.c.c != NULL) {
  435                 int ret = lru_crawler_client_getbuf(&active_crawler_mod.c);
  436                 if (ret != 0) {
  437                     lru_crawler_class_done(i);
  438                     continue;
  439                 }
  440             } else if (active_crawler_mod.mod->needs_client) {
  441                 lru_crawler_class_done(i);
  442                 continue;
  443             }
  444             pthread_mutex_lock(&lru_locks[i]);
  445             search = do_item_crawl_q((item *)&crawlers[i]);
  446             if (search == NULL ||
  447                 (crawlers[i].remaining && --crawlers[i].remaining < 1)) {
  448                 if (settings.verbose > 2)
  449                     fprintf(stderr, "Nothing left to crawl for %d\n", i);
  450                 lru_crawler_class_done(i);
  451                 continue;
  452             }
  453             uint32_t hv = hash(ITEM_key(search), search->nkey);
  454             /* Attempt to hash item lock the "search" item. If locked, no
  455              * other callers can incr the refcount
  456              */
  457             if ((hold_lock = item_trylock(hv)) == NULL) {
  458                 pthread_mutex_unlock(&lru_locks[i]);
  459                 continue;
  460             }
  461             /* Now see if the item is refcount locked */
  462             if (refcount_incr(search) != 2) {
  463                 refcount_decr(search);
  464                 if (hold_lock)
  465                     item_trylock_unlock(hold_lock);
  466                 pthread_mutex_unlock(&lru_locks[i]);
  467                 continue;
  468             }
  469 
  470             crawlers[i].checked++;
  471             /* Frees the item or decrements the refcount. */
  472             /* Interface for this could improve: do the free/decr here
  473              * instead? */
  474             if (!active_crawler_mod.mod->needs_lock) {
  475                 pthread_mutex_unlock(&lru_locks[i]);
  476             }
  477 
  478             active_crawler_mod.mod->eval(&active_crawler_mod, search, hv, i);
  479 
  480             if (hold_lock)
  481                 item_trylock_unlock(hold_lock);
  482             if (active_crawler_mod.mod->needs_lock) {
  483                 pthread_mutex_unlock(&lru_locks[i]);
  484             }
  485 
  486             if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
  487                 pthread_mutex_unlock(&lru_crawler_lock);
  488                 usleep(settings.lru_crawler_sleep);
  489                 pthread_mutex_lock(&lru_crawler_lock);
  490                 crawls_persleep = settings.crawls_persleep;
  491             } else if (!settings.lru_crawler_sleep) {
  492                 // TODO: only cycle lock every N?
  493                 pthread_mutex_unlock(&lru_crawler_lock);
  494                 pthread_mutex_lock(&lru_crawler_lock);
  495             }
  496         }
  497     } // while
  498     } // if crawler_count
  499 
  500     if (active_crawler_mod.mod != NULL) {
  501         if (active_crawler_mod.mod->finalize != NULL)
  502             active_crawler_mod.mod->finalize(&active_crawler_mod);
  503         while (active_crawler_mod.c.c != NULL && bipbuf_used(active_crawler_mod.c.buf)) {
  504             lru_crawler_poll(&active_crawler_mod.c);
  505         }
  506         // Double checking in case the client closed during the poll
  507         if (active_crawler_mod.c.c != NULL) {
  508             lru_crawler_release_client(&active_crawler_mod.c);
  509         }
  510         active_crawler_mod.mod = NULL;
  511     }
  512 
  513     if (settings.verbose > 2)
  514         fprintf(stderr, "LRU crawler thread sleeping\n");
  515 
  516     STATS_LOCK();
  517     stats_state.lru_crawler_running = false;
  518     STATS_UNLOCK();
  519     }
  520     pthread_mutex_unlock(&lru_crawler_lock);
  521     if (settings.verbose > 2)
  522         fprintf(stderr, "LRU crawler thread stopping\n");
  523     settings.lru_crawler = false;
  524 
  525     return NULL;
  526 }
  527 
  528 static pthread_t item_crawler_tid;
  529 
  530 int stop_item_crawler_thread(bool wait) {
  531     int ret;
  532     pthread_mutex_lock(&lru_crawler_lock);
  533     if (do_run_lru_crawler_thread == 0) {
  534         pthread_mutex_unlock(&lru_crawler_lock);
  535         return 0;
  536     }
  537     do_run_lru_crawler_thread = 0;
  538     pthread_cond_signal(&lru_crawler_cond);
  539     pthread_mutex_unlock(&lru_crawler_lock);
  540     if (wait && (ret = pthread_join(item_crawler_tid, NULL)) != 0) {
  541         fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
  542         return -1;
  543     }
  544     return 0;
  545 }
  546 
  547 /* Lock dance to "block" until thread is waiting on its condition:
  548  * caller locks mtx. caller spawns thread.
  549  * thread blocks on mutex.
  550  * caller waits on condition, releases lock.
  551  * thread gets lock, sends signal.
  552  * caller can't wait, as thread has lock.
  553  * thread waits on condition, releases lock
  554  * caller wakes on condition, gets lock.
  555  * caller immediately releases lock.
  556  * thread is now safely waiting on condition before the caller returns.
  557  */
  558 int start_item_crawler_thread(void) {
  559     int ret;
  560 
  561     if (settings.lru_crawler)
  562         return -1;
  563     pthread_mutex_lock(&lru_crawler_lock);
  564     do_run_lru_crawler_thread = 1;
  565     if ((ret = pthread_create(&item_crawler_tid, NULL,
  566         item_crawler_thread, NULL)) != 0) {
  567         fprintf(stderr, "Can't create LRU crawler thread: %s\n",
  568             strerror(ret));
  569         pthread_mutex_unlock(&lru_crawler_lock);
  570         return -1;
  571     }
  572     /* Avoid returning until the crawler has actually started */
  573     pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
  574     pthread_mutex_unlock(&lru_crawler_lock);
  575 
  576     return 0;
  577 }
  578 
  579 /* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
  580  * LRU every time.
  581  */
  582 static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
  583     uint32_t sid = id;
  584     int starts = 0;
  585 
  586     pthread_mutex_lock(&lru_locks[sid]);
  587     if (crawlers[sid].it_flags == 0) {
  588         if (settings.verbose > 2)
  589             fprintf(stderr, "Kicking LRU crawler off for LRU %u\n", sid);
  590         crawlers[sid].nbytes = 0;
  591         crawlers[sid].nkey = 0;
  592         crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */
  593         crawlers[sid].next = 0;
  594         crawlers[sid].prev = 0;
  595         crawlers[sid].time = 0;
  596         if (remaining == LRU_CRAWLER_CAP_REMAINING) {
  597             remaining = do_get_lru_size(sid);
  598         }
  599         /* Values for remaining:
  600          * remaining = 0
  601          * - scan all elements, until a NULL is reached
  602          * - if empty, NULL is reached right away
  603          * remaining = n + 1
  604          * - first n elements are parsed (or until a NULL is reached)
  605          */
  606         if (remaining) remaining++;
  607         crawlers[sid].remaining = remaining;
  608         crawlers[sid].slabs_clsid = sid;
  609         crawlers[sid].reclaimed = 0;
  610         crawlers[sid].unfetched = 0;
  611         crawlers[sid].checked = 0;
  612         do_item_linktail_q((item *)&crawlers[sid]);
  613         crawler_count++;
  614         starts++;
  615     }
  616     pthread_mutex_unlock(&lru_locks[sid]);
  617     return starts;
  618 }
  619 
  620 static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) {
  621     crawler_client_t *crawlc = &cm->c;
  622     if (crawlc->c != NULL) {
  623         return -1;
  624     }
  625     crawlc->c = c;
  626     crawlc->sfd = sfd;
  627 
  628     crawlc->buf = bipbuf_new(1024 * 128);
  629     if (crawlc->buf == NULL) {
  630         return -2;
  631     }
  632     return 0;
  633 }
  634 
  635 int lru_crawler_start(uint8_t *ids, uint32_t remaining,
  636                              const enum crawler_run_type type, void *data,
  637                              void *c, const int sfd) {
  638     int starts = 0;
  639     bool is_running;
  640     static rel_time_t block_ae_until = 0;
  641     pthread_mutex_lock(&lru_crawler_lock);
  642     STATS_LOCK();
  643     is_running = stats_state.lru_crawler_running;
  644     STATS_UNLOCK();
  645     if (do_run_lru_crawler_thread == 0) {
  646         pthread_mutex_unlock(&lru_crawler_lock);
  647         return -2;
  648     }
  649 
  650     if (is_running &&
  651             !(type == CRAWLER_AUTOEXPIRE && active_crawler_type == CRAWLER_AUTOEXPIRE)) {
  652         pthread_mutex_unlock(&lru_crawler_lock);
  653         block_ae_until = current_time + 60;
  654         return -1;
  655     }
  656 
  657     if (type == CRAWLER_AUTOEXPIRE && block_ae_until > current_time) {
  658         pthread_mutex_unlock(&lru_crawler_lock);
  659         return -1;
  660     }
  661 
  662     /* hash table walk only supported with metadump for now. */
  663     if (type != CRAWLER_METADUMP && ids == NULL) {
  664         pthread_mutex_unlock(&lru_crawler_lock);
  665         return -2;
  666     }
  667 
  668     /* Configure the module */
  669     if (!is_running) {
  670         assert(crawler_mod_regs[type] != NULL);
  671         active_crawler_mod.mod = crawler_mod_regs[type];
  672         active_crawler_type = type;
  673         if (active_crawler_mod.mod->init != NULL) {
  674             active_crawler_mod.mod->init(&active_crawler_mod, data);
  675         }
  676         if (active_crawler_mod.mod->needs_client) {
  677             if (c == NULL || sfd == 0) {
  678                 pthread_mutex_unlock(&lru_crawler_lock);
  679                 return -2;
  680             }
  681             if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) {
  682                 pthread_mutex_unlock(&lru_crawler_lock);
  683                 return -2;
  684             }
  685         }
  686     }
  687 
  688     if (ids == NULL) {
  689         /* NULL ids means to walk the hash table instead. */
  690         starts = 1;
  691         /* FIXME: hack to signal hash mode to the crawler thread.
  692          * Something more clear would be nice.
  693          */
  694         crawler_count = -1;
  695     } else {
  696         /* we allow the autocrawler to restart sub-LRU's before completion */
  697         for (int sid = POWER_SMALLEST; sid < POWER_LARGEST; sid++) {
  698             if (ids[sid])
  699                 starts += do_lru_crawler_start(sid, remaining);
  700         }
  701     }
  702     if (starts) {
  703         STATS_LOCK();
  704         stats_state.lru_crawler_running = true;
  705         stats.lru_crawler_starts++;
  706         STATS_UNLOCK();
  707         pthread_cond_signal(&lru_crawler_cond);
  708     }
  709     pthread_mutex_unlock(&lru_crawler_lock);
  710     return starts;
  711 }
  712 
  713 /*
  714  * Also only clear the crawlerstats once per sid.
  715  */
  716 enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type,
  717         void *c, const int sfd, unsigned int remaining) {
  718     char *b = NULL;
  719     uint32_t sid = 0;
  720     int starts = 0;
  721     uint8_t tocrawl[POWER_LARGEST];
  722     bool hash_crawl = false;
  723 
  724     /* FIXME: I added this while debugging. Don't think it's needed? */
  725     memset(tocrawl, 0, sizeof(uint8_t) * POWER_LARGEST);
  726     if (strcmp(slabs, "all") == 0) {
  727         for (sid = 0; sid < POWER_LARGEST; sid++) {
  728             tocrawl[sid] = 1;
  729         }
  730     } else if (strcmp(slabs, "hash") == 0) {
  731         hash_crawl = true;
  732     } else {
  733         for (char *p = strtok_r(slabs, ",", &b);
  734              p != NULL;
  735              p = strtok_r(NULL, ",", &b)) {
  736 
  737             if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
  738                     || sid >= MAX_NUMBER_OF_SLAB_CLASSES) {
  739                 return CRAWLER_BADCLASS;
  740             }
  741             tocrawl[sid | TEMP_LRU] = 1;
  742             tocrawl[sid | HOT_LRU] = 1;
  743             tocrawl[sid | WARM_LRU] = 1;
  744             tocrawl[sid | COLD_LRU] = 1;
  745         }
  746     }
  747 
  748     starts = lru_crawler_start(hash_crawl ? NULL : tocrawl, remaining, type, NULL, c, sfd);
  749     if (starts == -1) {
  750         return CRAWLER_RUNNING;
  751     } else if (starts == -2) {
  752         return CRAWLER_ERROR; /* FIXME: not very helpful. */
  753     } else if (starts) {
  754         return CRAWLER_OK;
  755     } else {
  756         return CRAWLER_NOTSTARTED;
  757     }
  758 }
  759 
  760 /* If we hold this lock, crawler can't wake up or move */
  761 void lru_crawler_pause(void) {
  762     pthread_mutex_lock(&lru_crawler_lock);
  763 }
  764 
  765 void lru_crawler_resume(void) {
  766     pthread_mutex_unlock(&lru_crawler_lock);
  767 }
  768 
  769 int init_lru_crawler(void *arg) {
  770     if (lru_crawler_initialized == 0) {
  771 #ifdef EXTSTORE
  772         storage = arg;
  773 #endif
  774         active_crawler_mod.c.c = NULL;
  775         active_crawler_mod.mod = NULL;
  776         active_crawler_mod.data = NULL;
  777         lru_crawler_initialized = 1;
  778     }
  779     return 0;
  780 }