"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.15/storage.c" (30 Mar 2022, 50770 Bytes) of package /linux/www/memcached-1.6.15.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "storage.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.6.14_vs_1.6.15.

    1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    2 #include "memcached.h"
    3 #ifdef EXTSTORE
    4 
    5 #include "storage.h"
    6 #include "extstore.h"
    7 #include <stdlib.h>
    8 #include <stdio.h>
    9 #include <stddef.h>
   10 #include <string.h>
   11 #include <limits.h>
   12 #include <ctype.h>
   13 
   14 #define PAGE_BUCKET_DEFAULT 0
   15 #define PAGE_BUCKET_COMPACT 1
   16 #define PAGE_BUCKET_CHUNKED 2
   17 #define PAGE_BUCKET_LOWTTL  3
   18 
   19 /*
   20  * API functions
   21  */
   22 
   23 // re-cast an io_pending_t into this more descriptive structure.
   24 // the first few items _must_ match the original struct.
   25 typedef struct _io_pending_storage_t {
   26     int io_queue_type;
   27     LIBEVENT_THREAD *thread;
   28     conn *c;
   29     mc_resp *resp;            /* original struct ends here */
   30     item *hdr_it;             /* original header item. */
   31     obj_io io_ctx;            /* embedded extstore IO header */
   32     unsigned int iovec_data;  /* specific index of data iovec */
   33     bool noreply;             /* whether the response had noreply set */
   34     bool miss;                /* signal a miss to unlink hdr_it */
   35     bool badcrc;              /* signal a crc failure */
   36     bool active;              /* tells if IO was dispatched or not */
   37 } io_pending_storage_t;
   38 
   39 // Only call this if item has ITEM_HDR
   40 bool storage_validate_item(void *e, item *it) {
   41     item_hdr *hdr = (item_hdr *)ITEM_data(it);
   42     if (extstore_check(e, hdr->page_id, hdr->page_version) != 0) {
   43         return false;
   44     } else {
   45         return true;
   46     }
   47 }
   48 
   49 void storage_delete(void *e, item *it) {
   50     if (it->it_flags & ITEM_HDR) {
   51         item_hdr *hdr = (item_hdr *)ITEM_data(it);
   52         extstore_delete(e, hdr->page_id, hdr->page_version,
   53                 1, ITEM_ntotal(it));
   54     }
   55 }
   56 
   57 // Function for the extra stats called from a protocol.
   58 // NOTE: This either needs a name change or a wrapper, perhaps?
   59 // it's defined here to reduce exposure of extstore.h to the rest of memcached
   60 // but feels a little off being defined here.
   61 // At very least maybe "process_storage_stats" in line with making this more
   62 // of a generic wrapper module.
   63 void process_extstore_stats(ADD_STAT add_stats, conn *c) {
   64     int i;
   65     char key_str[STAT_KEY_LEN];
   66     char val_str[STAT_VAL_LEN];
   67     int klen = 0, vlen = 0;
   68     struct extstore_stats st;
   69 
   70     assert(add_stats);
   71 
   72     void *storage = c->thread->storage;
   73     if (storage == NULL) {
   74         return;
   75     }
   76     extstore_get_stats(storage, &st);
   77     st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
   78     extstore_get_page_data(storage, &st);
   79 
   80     for (i = 0; i < st.page_count; i++) {
   81         APPEND_NUM_STAT(i, "version", "%llu",
   82                 (unsigned long long) st.page_data[i].version);
   83         APPEND_NUM_STAT(i, "bytes", "%llu",
   84                 (unsigned long long) st.page_data[i].bytes_used);
   85         APPEND_NUM_STAT(i, "bucket", "%u",
   86                 st.page_data[i].bucket);
   87         APPEND_NUM_STAT(i, "free_bucket", "%u",
   88                 st.page_data[i].free_bucket);
   89     }
   90 }
   91 
   92 // Additional storage stats for the main stats output.
   93 void storage_stats(ADD_STAT add_stats, conn *c) {
   94     struct extstore_stats st;
   95     if (c->thread->storage) {
   96         STATS_LOCK();
   97         APPEND_STAT("extstore_compact_lost", "%llu", (unsigned long long)stats.extstore_compact_lost);
   98         APPEND_STAT("extstore_compact_rescues", "%llu", (unsigned long long)stats.extstore_compact_rescues);
   99         APPEND_STAT("extstore_compact_skipped", "%llu", (unsigned long long)stats.extstore_compact_skipped);
  100         STATS_UNLOCK();
  101         extstore_get_stats(c->thread->storage, &st);
  102         APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_allocs);
  103         APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.page_evictions);
  104         APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.page_reclaims);
  105         APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_free);
  106         APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_used);
  107         APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.objects_evicted);
  108         APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.objects_read);
  109         APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.objects_written);
  110         APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.objects_used);
  111         APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.bytes_evicted);
  112         APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.bytes_written);
  113         APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_read);
  114         APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used);
  115         APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented);
  116         APPEND_STAT("extstore_limit_maxbytes", "%llu", (unsigned long long)(st.page_count * st.page_size));
  117         APPEND_STAT("extstore_io_queue", "%llu", (unsigned long long)(st.io_queue));
  118     }
  119 
  120 }
  121 
  122 
  123 // FIXME: This runs in the IO thread. to get better IO performance this should
  124 // simply mark the io wrapper with the return value and decrement wrapleft, if
  125 // zero redispatching. Still a bit of work being done in the side thread but
  126 // minimized at least.
  127 // TODO: wrap -> p?
  128 static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
  129     // FIXME: assumes success
  130     io_pending_storage_t *p = (io_pending_storage_t *)io->data;
  131     mc_resp *resp = p->resp;
  132     conn *c = p->c;
  133     assert(p->active == true);
  134     item *read_it = (item *)io->buf;
  135     bool miss = false;
  136 
  137     // TODO: How to do counters for hit/misses?
  138     if (ret < 1) {
  139         miss = true;
  140     } else {
  141         uint32_t crc2;
  142         uint32_t crc = (uint32_t) read_it->exptime;
  143         int x;
  144         // item is chunked, crc the iov's
  145         if (io->iov != NULL) {
  146             // first iov is the header, which we don't use beyond crc
  147             crc2 = crc32c(0, (char *)io->iov[0].iov_base+STORE_OFFSET, io->iov[0].iov_len-STORE_OFFSET);
  148             // make sure it's not sent. hack :(
  149             io->iov[0].iov_len = 0;
  150             for (x = 1; x < io->iovcnt; x++) {
  151                 crc2 = crc32c(crc2, (char *)io->iov[x].iov_base, io->iov[x].iov_len);
  152             }
  153         } else {
  154             crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, io->len-STORE_OFFSET);
  155         }
  156 
  157         if (crc != crc2) {
  158             miss = true;
  159             p->badcrc = true;
  160         }
  161     }
  162 
  163     if (miss) {
  164         if (p->noreply) {
  165             // In all GET cases, noreply means we send nothing back.
  166             resp->skip = true;
  167         } else {
  168             // TODO: This should be movable to the worker thread.
  169             // Convert the binprot response into a miss response.
  170             // The header requires knowing a bunch of stateful crap, so rather
  171             // than simply writing out a "new" miss response we mangle what's
  172             // already there.
  173             if (c->protocol == binary_prot) {
  174                 protocol_binary_response_header *header =
  175                     (protocol_binary_response_header *)resp->wbuf;
  176 
  177                 // cut the extra nbytes off of the body_len
  178                 uint32_t body_len = ntohl(header->response.bodylen);
  179                 uint8_t hdr_len = header->response.extlen;
  180                 body_len -= resp->iov[p->iovec_data].iov_len + hdr_len;
  181                 resp->tosend -= resp->iov[p->iovec_data].iov_len + hdr_len;
  182                 header->response.extlen = 0;
  183                 header->response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
  184                 header->response.bodylen = htonl(body_len);
  185 
  186                 // truncate the data response.
  187                 resp->iov[p->iovec_data].iov_len = 0;
  188                 // wipe the extlen iov... wish it was just a flat buffer.
  189                 resp->iov[p->iovec_data-1].iov_len = 0;
  190                 resp->chunked_data_iov = 0;
  191             } else {
  192                 int i;
  193                 // Meta commands have EN status lines for miss, rather than
  194                 // END as a trailer as per normal ascii.
  195                 if (resp->iov[0].iov_len >= 3
  196                         && memcmp(resp->iov[0].iov_base, "VA ", 3) == 0) {
  197                     // TODO: These miss translators should use specific callback
  198                     // functions attached to the io wrap. This is weird :(
  199                     resp->iovcnt = 1;
  200                     resp->iov[0].iov_len = 4;
  201                     resp->iov[0].iov_base = "EN\r\n";
  202                     resp->tosend = 4;
  203                 } else {
  204                     // Wipe the iovecs up through our data injection.
  205                     // Allows trailers to be returned (END)
  206                     for (i = 0; i <= p->iovec_data; i++) {
  207                         resp->tosend -= resp->iov[i].iov_len;
  208                         resp->iov[i].iov_len = 0;
  209                         resp->iov[i].iov_base = NULL;
  210                     }
  211                 }
  212                 resp->chunked_total = 0;
  213                 resp->chunked_data_iov = 0;
  214             }
  215         }
  216         p->miss = true;
  217     } else {
  218         assert(read_it->slabs_clsid != 0);
  219         // TODO: should always use it instead of ITEM_data to kill more
  220         // chunked special casing.
  221         if ((read_it->it_flags & ITEM_CHUNKED) == 0) {
  222             resp->iov[p->iovec_data].iov_base = ITEM_data(read_it);
  223         }
  224         p->miss = false;
  225     }
  226 
  227     p->active = false;
  228     //assert(c->io_wrapleft >= 0);
  229 
  230     // All IO's have returned, lets re-attach this connection to our original
  231     // thread.
  232     io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
  233     q->count--;
  234     if (q->count == 0) {
  235         redispatch_conn(c);
  236     }
  237 }
  238 
  239 int storage_get_item(conn *c, item *it, mc_resp *resp) {
  240 #ifdef NEED_ALIGN
  241     item_hdr hdr;
  242     memcpy(&hdr, ITEM_data(it), sizeof(hdr));
  243 #else
  244     item_hdr *hdr = (item_hdr *)ITEM_data(it);
  245 #endif
  246     io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_EXTSTORE);
  247     size_t ntotal = ITEM_ntotal(it);
  248     unsigned int clsid = slabs_clsid(ntotal);
  249     item *new_it;
  250     bool chunked = false;
  251     if (ntotal > settings.slab_chunk_size_max) {
  252         // Pull a chunked item header.
  253         uint32_t flags;
  254         FLAGS_CONV(it, flags);
  255         new_it = item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, it->nbytes);
  256         assert(new_it == NULL || (new_it->it_flags & ITEM_CHUNKED));
  257         chunked = true;
  258     } else {
  259         new_it = do_item_alloc_pull(ntotal, clsid);
  260     }
  261     if (new_it == NULL)
  262         return -1;
  263     // so we can free the chunk on a miss
  264     new_it->slabs_clsid = clsid;
  265 
  266     io_pending_storage_t *p = do_cache_alloc(c->thread->io_cache);
  267     // this is a re-cast structure, so assert that we never outsize it.
  268     assert(sizeof(io_pending_t) >= sizeof(io_pending_storage_t));
  269     memset(p, 0, sizeof(io_pending_storage_t));
  270     p->active = true;
  271     p->miss = false;
  272     p->badcrc = false;
  273     p->noreply = c->noreply;
  274     // io_pending owns the reference for this object now.
  275     p->hdr_it = it;
  276     p->resp = resp;
  277     p->io_queue_type = IO_QUEUE_EXTSTORE;
  278     obj_io *eio = &p->io_ctx;
  279 
  280     // FIXME: error handling.
  281     if (chunked) {
  282         unsigned int ciovcnt = 0;
  283         size_t remain = new_it->nbytes;
  284         item_chunk *chunk = (item_chunk *) ITEM_schunk(new_it);
  285         // TODO: This might make sense as a _global_ cache vs a per-thread.
  286         // but we still can't load objects requiring > IOV_MAX iovs.
  287         // In the meantime, these objects are rare/slow enough that
  288         // malloc/freeing a statically sized object won't cause us much pain.
  289         eio->iov = malloc(sizeof(struct iovec) * IOV_MAX);
  290         if (eio->iov == NULL) {
  291             item_remove(new_it);
  292             do_cache_free(c->thread->io_cache, p);
  293             return -1;
  294         }
  295 
  296         // fill the header so we can get the full data + crc back.
  297         eio->iov[0].iov_base = new_it;
  298         eio->iov[0].iov_len = ITEM_ntotal(new_it) - new_it->nbytes;
  299         ciovcnt++;
  300 
  301         while (remain > 0) {
  302             chunk = do_item_alloc_chunk(chunk, remain);
  303             // FIXME: _pure evil_, silently erroring if item is too large.
  304             if (chunk == NULL || ciovcnt > IOV_MAX-1) {
  305                 item_remove(new_it);
  306                 free(eio->iov);
  307                 // TODO: wrapper function for freeing up an io wrap?
  308                 eio->iov = NULL;
  309                 do_cache_free(c->thread->io_cache, p);
  310                 return -1;
  311             }
  312             eio->iov[ciovcnt].iov_base = chunk->data;
  313             eio->iov[ciovcnt].iov_len = (remain < chunk->size) ? remain : chunk->size;
  314             chunk->used = (remain < chunk->size) ? remain : chunk->size;
  315             remain -= chunk->size;
  316             ciovcnt++;
  317         }
  318 
  319         eio->iovcnt = ciovcnt;
  320     }
  321 
  322     // Chunked or non chunked we reserve a response iov here.
  323     p->iovec_data = resp->iovcnt;
  324     int iovtotal = (c->protocol == binary_prot) ? it->nbytes - 2 : it->nbytes;
  325     if (chunked) {
  326         resp_add_chunked_iov(resp, new_it, iovtotal);
  327     } else {
  328         resp_add_iov(resp, "", iovtotal);
  329     }
  330 
  331     // We can't bail out anymore, so mc_resp owns the IO from here.
  332     resp->io_pending = (io_pending_t *)p;
  333 
  334     eio->buf = (void *)new_it;
  335     p->c = c;
  336 
  337     // We need to stack the sub-struct IO's together for submission.
  338     eio->next = q->stack_ctx;
  339     q->stack_ctx = eio;
  340 
  341     // No need to stack the io_pending's together as they live on mc_resp's.
  342     assert(q->count >= 0);
  343     q->count++;
  344     // reference ourselves for the callback.
  345     eio->data = (void *)p;
  346 
  347     // Now, fill in io->io based on what was in our header.
  348 #ifdef NEED_ALIGN
  349     eio->page_version = hdr.page_version;
  350     eio->page_id = hdr.page_id;
  351     eio->offset = hdr.offset;
  352 #else
  353     eio->page_version = hdr->page_version;
  354     eio->page_id = hdr->page_id;
  355     eio->offset = hdr->offset;
  356 #endif
  357     eio->len = ntotal;
  358     eio->mode = OBJ_IO_READ;
  359     eio->cb = _storage_get_item_cb;
  360 
  361     // FIXME: This stat needs to move to reflect # of flash hits vs misses
  362     // for now it's a good gauge on how often we request out to flash at
  363     // least.
  364     pthread_mutex_lock(&c->thread->stats.mutex);
  365     c->thread->stats.get_extstore++;
  366     pthread_mutex_unlock(&c->thread->stats.mutex);
  367 
  368     return 0;
  369 }
  370 
  371 void storage_submit_cb(io_queue_t *q) {
  372     // Don't need to do anything special for extstore.
  373     extstore_submit(q->ctx, q->stack_ctx);
  374 }
  375 
  376 static void recache_or_free(io_pending_t *pending) {
  377     // re-cast to our specific struct.
  378     io_pending_storage_t *p = (io_pending_storage_t *)pending;
  379 
  380     conn *c = p->c;
  381     obj_io *io = &p->io_ctx;
  382     assert(io != NULL);
  383     item *it = (item *)io->buf;
  384     assert(c != NULL);
  385     bool do_free = true;
  386     if (p->active) {
  387         // If request never dispatched, free the read buffer but leave the
  388         // item header alone.
  389         do_free = false;
  390         size_t ntotal = ITEM_ntotal(p->hdr_it);
  391         slabs_free(it, ntotal, slabs_clsid(ntotal));
  392 
  393         io_queue_t *q = conn_io_queue_get(c, p->io_queue_type);
  394         q->count--;
  395         assert(q->count >= 0);
  396         pthread_mutex_lock(&c->thread->stats.mutex);
  397         c->thread->stats.get_aborted_extstore++;
  398         pthread_mutex_unlock(&c->thread->stats.mutex);
  399     } else if (p->miss) {
  400         // If request was ultimately a miss, unlink the header.
  401         do_free = false;
  402         size_t ntotal = ITEM_ntotal(p->hdr_it);
  403         item_unlink(p->hdr_it);
  404         slabs_free(it, ntotal, slabs_clsid(ntotal));
  405         pthread_mutex_lock(&c->thread->stats.mutex);
  406         c->thread->stats.miss_from_extstore++;
  407         if (p->badcrc)
  408             c->thread->stats.badcrc_from_extstore++;
  409         pthread_mutex_unlock(&c->thread->stats.mutex);
  410     } else if (settings.ext_recache_rate) {
  411         // hashvalue is cuddled during store
  412         uint32_t hv = (uint32_t)it->time;
  413         // opt to throw away rather than wait on a lock.
  414         void *hold_lock = item_trylock(hv);
  415         if (hold_lock != NULL) {
  416             item *h_it = p->hdr_it;
  417             uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE;
  418             // Item must be recently hit at least twice to recache.
  419             if (((h_it->it_flags & flags) == flags) &&
  420                     h_it->time > current_time - ITEM_UPDATE_INTERVAL &&
  421                     c->recache_counter++ % settings.ext_recache_rate == 0) {
  422                 do_free = false;
  423                 // In case it's been updated.
  424                 it->exptime = h_it->exptime;
  425                 it->it_flags &= ~ITEM_LINKED;
  426                 it->refcount = 0;
  427                 it->h_next = NULL; // might not be necessary.
  428                 STORAGE_delete(c->thread->storage, h_it);
  429                 item_replace(h_it, it, hv);
  430                 pthread_mutex_lock(&c->thread->stats.mutex);
  431                 c->thread->stats.recache_from_extstore++;
  432                 pthread_mutex_unlock(&c->thread->stats.mutex);
  433             }
  434         }
  435         if (hold_lock)
  436             item_trylock_unlock(hold_lock);
  437     }
  438     if (do_free)
  439         slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it));
  440 
  441     p->io_ctx.buf = NULL;
  442     p->io_ctx.next = NULL;
  443     p->active = false;
  444 
  445     // TODO: reuse lock and/or hv.
  446     item_remove(p->hdr_it);
  447 }
  448 
  449 // Called after the IO is processed but before the response is transmitted.
  450 // TODO: stubbed with a reminder: should be able to move most of the extstore
  451 // callback code into this code instead, executing on worker thread instead of
  452 // IO thread.
  453 void storage_complete_cb(io_queue_t *q) {
  454     // need to reset the stack for next use.
  455     q->stack_ctx = NULL;
  456     return;
  457 }
  458 
  459 // Called after responses have been transmitted. Need to free up related data.
  460 void storage_finalize_cb(io_pending_t *pending) {
  461     recache_or_free(pending);
  462     io_pending_storage_t *p = (io_pending_storage_t *)pending;
  463     obj_io *io = &p->io_ctx;
  464     // malloc'ed iovec list used for chunked extstore fetches.
  465     if (io->iov) {
  466         free(io->iov);
  467         io->iov = NULL;
  468     }
  469     // don't need to free the main context, since it's embedded.
  470 }
  471 
  472 /*
  473  * WRITE FLUSH THREAD
  474  */
  475 
  476 static int storage_write(void *storage, const int clsid, const int item_age) {
  477     int did_moves = 0;
  478     struct lru_pull_tail_return it_info;
  479 
  480     it_info.it = NULL;
  481     lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info);
  482     /* Item is locked, and we have a reference to it. */
  483     if (it_info.it == NULL) {
  484         return did_moves;
  485     }
  486 
  487     obj_io io;
  488     item *it = it_info.it;
  489     /* First, storage for the header object */
  490     size_t orig_ntotal = ITEM_ntotal(it);
  491     uint32_t flags;
  492     if ((it->it_flags & ITEM_HDR) == 0 &&
  493             (item_age == 0 || current_time - it->time > item_age)) {
  494         FLAGS_CONV(it, flags);
  495         item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr));
  496         /* Run the storage write understanding the start of the item is dirty.
  497          * We will fill it (time/exptime/etc) from the header item on read.
  498          */
  499         if (hdr_it != NULL) {
  500             int bucket = (it->it_flags & ITEM_CHUNKED) ?
  501                 PAGE_BUCKET_CHUNKED : PAGE_BUCKET_DEFAULT;
  502             // Compress soon to expire items into similar pages.
  503             if (it->exptime - current_time < settings.ext_low_ttl) {
  504                 bucket = PAGE_BUCKET_LOWTTL;
  505             }
  506             hdr_it->it_flags |= ITEM_HDR;
  507             io.len = orig_ntotal;
  508             io.mode = OBJ_IO_WRITE;
  509             // NOTE: when the item is read back in, the slab mover
  510             // may see it. Important to have refcount>=2 or ~ITEM_LINKED
  511             assert(it->refcount >= 2);
  512             // NOTE: write bucket vs free page bucket will disambiguate once
  513             // lowttl feature is better understood.
  514             if (extstore_write_request(storage, bucket, bucket, &io) == 0) {
  515                 // cuddle the hash value into the time field so we don't have
  516                 // to recalculate it.
  517                 item *buf_it = (item *) io.buf;
  518                 buf_it->time = it_info.hv;
  519                 // copy from past the headers + time headers.
  520                 // TODO: should be in items.c
  521                 if (it->it_flags & ITEM_CHUNKED) {
  522                     // Need to loop through the item and copy
  523                     item_chunk *sch = (item_chunk *) ITEM_schunk(it);
  524                     int remain = orig_ntotal;
  525                     int copied = 0;
  526                     // copy original header
  527                     int hdrtotal = ITEM_ntotal(it) - it->nbytes;
  528                     memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, hdrtotal - STORE_OFFSET);
  529                     copied = hdrtotal;
  530                     // copy data in like it were one large object.
  531                     while (sch && remain) {
  532                         assert(remain >= sch->used);
  533                         memcpy((char *)io.buf+copied, sch->data, sch->used);
  534                         // FIXME: use one variable?
  535                         remain -= sch->used;
  536                         copied += sch->used;
  537                         sch = sch->next;
  538                     }
  539                 } else {
  540                     memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, io.len-STORE_OFFSET);
  541                 }
  542                 // crc what we copied so we can do it sequentially.
  543                 buf_it->it_flags &= ~ITEM_LINKED;
  544                 buf_it->exptime = crc32c(0, (char*)io.buf+STORE_OFFSET, orig_ntotal-STORE_OFFSET);
  545                 extstore_write(storage, &io);
  546                 item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it);
  547                 hdr->page_version = io.page_version;
  548                 hdr->page_id = io.page_id;
  549                 hdr->offset  = io.offset;
  550                 // overload nbytes for the header it
  551                 hdr_it->nbytes = it->nbytes;
  552                 /* success! Now we need to fill relevant data into the new
  553                  * header and replace. Most of this requires the item lock
  554                  */
  555                 /* CAS gets set while linking. Copy post-replace */
  556                 item_replace(it, hdr_it, it_info.hv);
  557                 ITEM_set_cas(hdr_it, ITEM_get_cas(it));
  558                 do_item_remove(hdr_it);
  559                 did_moves = 1;
  560                 LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EXTSTORE_WRITE, it, bucket);
  561             } else {
  562                 /* Failed to write for some reason, can't continue. */
  563                 slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it));
  564             }
  565         }
  566     }
  567     do_item_remove(it);
  568     item_unlock(it_info.hv);
  569     return did_moves;
  570 }
  571 
  572 static pthread_t storage_write_tid;
  573 static pthread_mutex_t storage_write_plock;
  574 #define WRITE_SLEEP_MIN 500
  575 
  576 static void *storage_write_thread(void *arg) {
  577     void *storage = arg;
  578     // NOTE: ignoring overflow since that would take years of uptime in a
  579     // specific load pattern of never going to sleep.
  580     unsigned int backoff[MAX_NUMBER_OF_SLAB_CLASSES] = {0};
  581     unsigned int counter = 0;
  582     useconds_t to_sleep = WRITE_SLEEP_MIN;
  583     logger *l = logger_create();
  584     if (l == NULL) {
  585         fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
  586         abort();
  587     }
  588 
  589     pthread_mutex_lock(&storage_write_plock);
  590 
  591     while (1) {
  592         // cache per-loop to avoid calls to the slabs_clsid() search loop
  593         int min_class = slabs_clsid(settings.ext_item_size);
  594         bool do_sleep = true;
  595         counter++;
  596         if (to_sleep > settings.ext_max_sleep)
  597             to_sleep = settings.ext_max_sleep;
  598 
  599         for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
  600             bool did_move = false;
  601             bool mem_limit_reached = false;
  602             unsigned int chunks_free;
  603             int item_age;
  604             int target = settings.ext_free_memchunks[x];
  605             if (min_class > x || (backoff[x] && (counter % backoff[x] != 0))) {
  606                 // Long sleeps means we should retry classes sooner.
  607                 if (to_sleep > WRITE_SLEEP_MIN * 10)
  608                     backoff[x] /= 2;
  609                 continue;
  610             }
  611 
  612             // Avoid extra slab lock calls during heavy writing.
  613             chunks_free = slabs_available_chunks(x, &mem_limit_reached,
  614                     NULL);
  615 
  616             // storage_write() will fail and cut loop after filling write buffer.
  617             while (1) {
  618                 // if we are low on chunks and no spare, push out early.
  619                 if (chunks_free < target && mem_limit_reached) {
  620                     item_age = 0;
  621                 } else {
  622                     item_age = settings.ext_item_age;
  623                 }
  624                 if (storage_write(storage, x, item_age)) {
  625                     chunks_free++; // Allow stopping if we've done enough this loop
  626                     did_move = true;
  627                     do_sleep = false;
  628                     if (to_sleep > WRITE_SLEEP_MIN)
  629                         to_sleep /= 2;
  630                 } else {
  631                     break;
  632                 }
  633             }
  634 
  635             if (!did_move) {
  636                 backoff[x]++;
  637             } else if (backoff[x]) {
  638                 backoff[x] /= 2;
  639             }
  640         }
  641 
  642         // flip lock so we can be paused or stopped
  643         pthread_mutex_unlock(&storage_write_plock);
  644         if (do_sleep) {
  645             usleep(to_sleep);
  646             to_sleep *= 2;
  647         }
  648         pthread_mutex_lock(&storage_write_plock);
  649     }
  650     return NULL;
  651 }
  652 
  653 // TODO
  654 // logger needs logger_destroy() to exist/work before this is safe.
  655 /*int stop_storage_write_thread(void) {
  656     int ret;
  657     pthread_mutex_lock(&lru_maintainer_lock);
  658     do_run_lru_maintainer_thread = 0;
  659     pthread_mutex_unlock(&lru_maintainer_lock);
  660     // WAKEUP SIGNAL
  661     if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
  662         fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
  663         return -1;
  664     }
  665     settings.lru_maintainer_thread = false;
  666     return 0;
  667 }*/
  668 
  669 void storage_write_pause(void) {
  670     pthread_mutex_lock(&storage_write_plock);
  671 }
  672 
  673 void storage_write_resume(void) {
  674     pthread_mutex_unlock(&storage_write_plock);
  675 }
  676 
  677 int start_storage_write_thread(void *arg) {
  678     int ret;
  679 
  680     pthread_mutex_init(&storage_write_plock, NULL);
  681     if ((ret = pthread_create(&storage_write_tid, NULL,
  682         storage_write_thread, arg)) != 0) {
  683         fprintf(stderr, "Can't create storage_write thread: %s\n",
  684             strerror(ret));
  685         return -1;
  686     }
  687 
  688     return 0;
  689 }
  690 
  691 /*** COMPACTOR ***/
  692 
  693 /* Fetch stats from the external storage system and decide to compact.
  694  * If we're more than half full, start skewing how aggressively to run
  695  * compaction, up to a desired target when all pages are full.
  696  */
  697 static int storage_compact_check(void *storage, logger *l,
  698         uint32_t *page_id, uint64_t *page_version,
  699         uint64_t *page_size, bool *drop_unread) {
  700     struct extstore_stats st;
  701     int x;
  702     double rate;
  703     uint64_t frag_limit;
  704     uint64_t low_version = ULLONG_MAX;
  705     uint64_t lowest_version = ULLONG_MAX;
  706     unsigned int low_page = 0;
  707     unsigned int lowest_page = 0;
  708     extstore_get_stats(storage, &st);
  709     if (st.pages_used == 0)
  710         return 0;
  711 
  712     // lets pick a target "wasted" value and slew.
  713     if (st.pages_free > settings.ext_compact_under)
  714         return 0;
  715     *drop_unread = false;
  716 
  717     // the number of free pages reduces the configured frag limit
  718     // this allows us to defrag early if pages are very empty.
  719     rate = 1.0 - ((double)st.pages_free / st.page_count);
  720     rate *= settings.ext_max_frag;
  721     frag_limit = st.page_size * rate;
  722     LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO,
  723             NULL, rate, frag_limit);
  724     st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
  725     extstore_get_page_data(storage, &st);
  726 
  727     // find oldest page by version that violates the constraint
  728     for (x = 0; x < st.page_count; x++) {
  729         if (st.page_data[x].version == 0 ||
  730             st.page_data[x].bucket == PAGE_BUCKET_LOWTTL)
  731             continue;
  732         if (st.page_data[x].version < lowest_version) {
  733             lowest_page = x;
  734             lowest_version = st.page_data[x].version;
  735         }
  736         if (st.page_data[x].bytes_used < frag_limit) {
  737             if (st.page_data[x].version < low_version) {
  738                 low_page = x;
  739                 low_version = st.page_data[x].version;
  740             }
  741         }
  742     }
  743     *page_size = st.page_size;
  744     free(st.page_data);
  745 
  746     // we have a page + version to attempt to reclaim.
  747     if (low_version != ULLONG_MAX) {
  748         *page_id = low_page;
  749         *page_version = low_version;
  750         return 1;
  751     } else if (lowest_version != ULLONG_MAX && settings.ext_drop_unread
  752             && st.pages_free <= settings.ext_drop_under) {
  753         // nothing matched the frag rate barrier, so pick the absolute oldest
  754         // version if we're configured to drop items.
  755         *page_id = lowest_page;
  756         *page_version = lowest_version;
  757         *drop_unread = true;
  758         return 1;
  759     }
  760 
  761     return 0;
  762 }
  763 
  764 static pthread_t storage_compact_tid;
  765 static pthread_mutex_t storage_compact_plock;
  766 #define MIN_STORAGE_COMPACT_SLEEP 10000
  767 
  768 struct storage_compact_wrap {
  769     obj_io io;
  770     pthread_mutex_t lock; // gates the bools.
  771     bool done;
  772     bool submitted;
  773     bool miss; // version flipped out from under us
  774 };
  775 
  776 static void storage_compact_readback(void *storage, logger *l,
  777         bool drop_unread, char *readback_buf,
  778         uint32_t page_id, uint64_t page_version, uint64_t read_size) {
  779     uint64_t offset = 0;
  780     unsigned int rescues = 0;
  781     unsigned int lost = 0;
  782     unsigned int skipped = 0;
  783 
  784     while (offset < read_size) {
  785         item *hdr_it = NULL;
  786         item_hdr *hdr = NULL;
  787         item *it = (item *)(readback_buf+offset);
  788         unsigned int ntotal;
  789         // probably zeroed out junk at the end of the wbuf
  790         if (it->nkey == 0) {
  791             break;
  792         }
  793 
  794         ntotal = ITEM_ntotal(it);
  795         uint32_t hv = (uint32_t)it->time;
  796         item_lock(hv);
  797         // We don't have a conn and don't need to do most of do_item_get
  798         hdr_it = assoc_find(ITEM_key(it), it->nkey, hv);
  799         if (hdr_it != NULL) {
  800             bool do_write = false;
  801             refcount_incr(hdr_it);
  802 
  803             // Check validity but don't bother removing it.
  804             if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) &&
  805                    (hdr_it->exptime == 0 || hdr_it->exptime > current_time)) {
  806                 hdr = (item_hdr *)ITEM_data(hdr_it);
  807                 if (hdr->page_id == page_id && hdr->page_version == page_version) {
  808                     // Item header is still completely valid.
  809                     extstore_delete(storage, page_id, page_version, 1, ntotal);
  810                     // drop inactive items.
  811                     if (drop_unread && GET_LRU(hdr_it->slabs_clsid) == COLD_LRU) {
  812                         do_write = false;
  813                         skipped++;
  814                     } else {
  815                         do_write = true;
  816                     }
  817                 }
  818             }
  819 
  820             if (do_write) {
  821                 bool do_update = false;
  822                 int tries;
  823                 obj_io io;
  824                 io.len = ntotal;
  825                 io.mode = OBJ_IO_WRITE;
  826                 for (tries = 10; tries > 0; tries--) {
  827                     if (extstore_write_request(storage, PAGE_BUCKET_COMPACT, PAGE_BUCKET_COMPACT, &io) == 0) {
  828                         memcpy(io.buf, it, io.len);
  829                         extstore_write(storage, &io);
  830                         do_update = true;
  831                         break;
  832                     } else {
  833                         usleep(1000);
  834                     }
  835                 }
  836 
  837                 if (do_update) {
  838                     if (it->refcount == 2) {
  839                         hdr->page_version = io.page_version;
  840                         hdr->page_id = io.page_id;
  841                         hdr->offset = io.offset;
  842                         rescues++;
  843                     } else {
  844                         lost++;
  845                         // TODO: re-alloc and replace header.
  846                     }
  847                 } else {
  848                     lost++;
  849                 }
  850             }
  851 
  852             do_item_remove(hdr_it);
  853         }
  854 
  855         item_unlock(hv);
  856         offset += ntotal;
  857         if (read_size - offset < sizeof(struct _stritem))
  858             break;
  859     }
  860 
  861     STATS_LOCK();
  862     stats.extstore_compact_lost += lost;
  863     stats.extstore_compact_rescues += rescues;
  864     stats.extstore_compact_skipped += skipped;
  865     STATS_UNLOCK();
  866     LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END,
  867             NULL, page_id, offset, rescues, lost, skipped);
  868 }
  869 
  870 static void _storage_compact_cb(void *e, obj_io *io, int ret) {
  871     struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data;
  872     assert(wrap->submitted == true);
  873 
  874     pthread_mutex_lock(&wrap->lock);
  875 
  876     if (ret < 1) {
  877         wrap->miss = true;
  878     }
  879     wrap->done = true;
  880 
  881     pthread_mutex_unlock(&wrap->lock);
  882 }
  883 
  884 // TODO: hoist the storage bits from lru_maintainer_thread in here.
  885 // would be nice if they could avoid hammering the same locks though?
  886 // I guess it's only COLD. that's probably fine.
  887 static void *storage_compact_thread(void *arg) {
  888     void *storage = arg;
  889     useconds_t to_sleep = settings.ext_max_sleep;
  890     bool compacting = false;
  891     uint64_t page_version = 0;
  892     uint64_t page_size = 0;
  893     uint64_t page_offset = 0;
  894     uint32_t page_id = 0;
  895     bool drop_unread = false;
  896     char *readback_buf = NULL;
  897     struct storage_compact_wrap wrap;
  898 
  899     logger *l = logger_create();
  900     if (l == NULL) {
  901         fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
  902         abort();
  903     }
  904 
  905     readback_buf = malloc(settings.ext_wbuf_size);
  906     if (readback_buf == NULL) {
  907         fprintf(stderr, "Failed to allocate readback buffer for storage compaction thread\n");
  908         abort();
  909     }
  910 
  911     pthread_mutex_init(&wrap.lock, NULL);
  912     wrap.done = false;
  913     wrap.submitted = false;
  914     wrap.io.data = &wrap;
  915     wrap.io.iov = NULL;
  916     wrap.io.buf = (void *)readback_buf;
  917 
  918     wrap.io.len = settings.ext_wbuf_size;
  919     wrap.io.mode = OBJ_IO_READ;
  920     wrap.io.cb = _storage_compact_cb;
  921     pthread_mutex_lock(&storage_compact_plock);
  922 
  923     while (1) {
  924         pthread_mutex_unlock(&storage_compact_plock);
  925         if (to_sleep) {
  926             extstore_run_maint(storage);
  927             usleep(to_sleep);
  928         }
  929         pthread_mutex_lock(&storage_compact_plock);
  930 
  931         if (!compacting && storage_compact_check(storage, l,
  932                     &page_id, &page_version, &page_size, &drop_unread)) {
  933             page_offset = 0;
  934             compacting = true;
  935             LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START,
  936                     NULL, page_id, page_version);
  937         }
  938 
  939         if (compacting) {
  940             pthread_mutex_lock(&wrap.lock);
  941             if (page_offset < page_size && !wrap.done && !wrap.submitted) {
  942                 wrap.io.page_version = page_version;
  943                 wrap.io.page_id = page_id;
  944                 wrap.io.offset = page_offset;
  945                 // FIXME: should be smarter about io->next (unlink at use?)
  946                 wrap.io.next = NULL;
  947                 wrap.submitted = true;
  948                 wrap.miss = false;
  949 
  950                 extstore_submit(storage, &wrap.io);
  951             } else if (wrap.miss) {
  952                 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT,
  953                         NULL, page_id);
  954                 wrap.done = false;
  955                 wrap.submitted = false;
  956                 compacting = false;
  957             } else if (wrap.submitted && wrap.done) {
  958                 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START,
  959                         NULL, page_id, page_offset);
  960                 storage_compact_readback(storage, l, drop_unread,
  961                         readback_buf, page_id, page_version, settings.ext_wbuf_size);
  962                 page_offset += settings.ext_wbuf_size;
  963                 wrap.done = false;
  964                 wrap.submitted = false;
  965             } else if (page_offset >= page_size) {
  966                 compacting = false;
  967                 wrap.done = false;
  968                 wrap.submitted = false;
  969                 extstore_close_page(storage, page_id, page_version);
  970                 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END,
  971                         NULL, page_id);
  972             }
  973             pthread_mutex_unlock(&wrap.lock);
  974 
  975             // finish actual compaction quickly.
  976             to_sleep = MIN_STORAGE_COMPACT_SLEEP;
  977         } else {
  978             if (to_sleep < settings.ext_max_sleep)
  979                 to_sleep += settings.ext_max_sleep;
  980         }
  981     }
  982     free(readback_buf);
  983 
  984     return NULL;
  985 }
  986 
  987 // TODO
  988 // logger needs logger_destroy() to exist/work before this is safe.
  989 /*int stop_storage_compact_thread(void) {
  990     int ret;
  991     pthread_mutex_lock(&lru_maintainer_lock);
  992     do_run_lru_maintainer_thread = 0;
  993     pthread_mutex_unlock(&lru_maintainer_lock);
  994     if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
  995         fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
  996         return -1;
  997     }
  998     settings.lru_maintainer_thread = false;
  999     return 0;
 1000 }*/
 1001 
 1002 void storage_compact_pause(void) {
 1003     pthread_mutex_lock(&storage_compact_plock);
 1004 }
 1005 
 1006 void storage_compact_resume(void) {
 1007     pthread_mutex_unlock(&storage_compact_plock);
 1008 }
 1009 
 1010 int start_storage_compact_thread(void *arg) {
 1011     int ret;
 1012 
 1013     pthread_mutex_init(&storage_compact_plock, NULL);
 1014     if ((ret = pthread_create(&storage_compact_tid, NULL,
 1015         storage_compact_thread, arg)) != 0) {
 1016         fprintf(stderr, "Can't create storage_compact thread: %s\n",
 1017             strerror(ret));
 1018         return -1;
 1019     }
 1020 
 1021     return 0;
 1022 }
 1023 
 1024 /*** UTILITY ***/
 1025 // /path/to/file:100G:bucket1
 1026 // FIXME: Modifies argument. copy instead?
 1027 struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size) {
 1028     struct extstore_conf_file *cf = NULL;
 1029     char *b = NULL;
 1030     char *p = strtok_r(arg, ":", &b);
 1031     char unit = 0;
 1032     uint64_t multiplier = 0;
 1033     int base_size = 0;
 1034     if (p == NULL)
 1035         goto error;
 1036     // First arg is the filepath.
 1037     cf = calloc(1, sizeof(struct extstore_conf_file));
 1038     cf->file = strdup(p);
 1039 
 1040     p = strtok_r(NULL, ":", &b);
 1041     if (p == NULL) {
 1042         fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n");
 1043         goto error;
 1044     }
 1045     unit = tolower(p[strlen(p)-1]);
 1046     p[strlen(p)-1] = '\0';
 1047     // sigh.
 1048     switch (unit) {
 1049         case 'm':
 1050             multiplier = 1024 * 1024;
 1051             break;
 1052         case 'g':
 1053             multiplier = 1024 * 1024 * 1024;
 1054             break;
 1055         case 't':
 1056             multiplier = 1024 * 1024;
 1057             multiplier *= 1024 * 1024;
 1058             break;
 1059         case 'p':
 1060             multiplier = 1024 * 1024;
 1061             multiplier *= 1024 * 1024 * 1024;
 1062             break;
 1063     }
 1064     base_size = atoi(p);
 1065     multiplier *= base_size;
 1066     // page_count is nearest-but-not-larger-than pages * psize
 1067     cf->page_count = multiplier / page_size;
 1068     assert(page_size * cf->page_count <= multiplier);
 1069 
 1070     // final token would be a default free bucket
 1071     p = strtok_r(NULL, ",", &b);
 1072     // TODO: We reuse the original DEFINES for now,
 1073     // but if lowttl gets split up this needs to be its own set.
 1074     if (p != NULL) {
 1075         if (strcmp(p, "compact") == 0) {
 1076             cf->free_bucket = PAGE_BUCKET_COMPACT;
 1077         } else if (strcmp(p, "lowttl") == 0) {
 1078             cf->free_bucket = PAGE_BUCKET_LOWTTL;
 1079         } else if (strcmp(p, "chunked") == 0) {
 1080             cf->free_bucket = PAGE_BUCKET_CHUNKED;
 1081         } else if (strcmp(p, "default") == 0) {
 1082             cf->free_bucket = PAGE_BUCKET_DEFAULT;
 1083         } else {
 1084             fprintf(stderr, "Unknown extstore bucket: %s\n", p);
 1085             goto error;
 1086         }
 1087     } else {
 1088         // TODO: is this necessary?
 1089         cf->free_bucket = PAGE_BUCKET_DEFAULT;
 1090     }
 1091 
 1092     // TODO: disabling until compact algorithm is improved.
 1093     if (cf->free_bucket != PAGE_BUCKET_DEFAULT) {
 1094         fprintf(stderr, "ext_path only presently supports the default bucket\n");
 1095         goto error;
 1096     }
 1097 
 1098     return cf;
 1099 error:
 1100     if (cf) {
 1101         if (cf->file)
 1102             free(cf->file);
 1103         free(cf);
 1104     }
 1105     return NULL;
 1106 }
 1107 
 1108 struct storage_settings {
 1109     struct extstore_conf_file *storage_file;
 1110     struct extstore_conf ext_cf;
 1111 };
 1112 
 1113 void *storage_init_config(struct settings *s) {
 1114     struct storage_settings *cf = calloc(1, sizeof(struct storage_settings));
 1115 
 1116     s->ext_item_size = 512;
 1117     s->ext_item_age = UINT_MAX;
 1118     s->ext_low_ttl = 0;
 1119     s->ext_recache_rate = 2000;
 1120     s->ext_max_frag = 0.8;
 1121     s->ext_drop_unread = false;
 1122     s->ext_wbuf_size = 1024 * 1024 * 4;
 1123     s->ext_compact_under = 0;
 1124     s->ext_drop_under = 0;
 1125     s->ext_max_sleep = 1000000;
 1126     s->slab_automove_freeratio = 0.01;
 1127     s->ext_page_size = 1024 * 1024 * 64;
 1128     s->ext_io_threadcount = 1;
 1129     cf->ext_cf.page_size = settings.ext_page_size;
 1130     cf->ext_cf.wbuf_size = settings.ext_wbuf_size;
 1131     cf->ext_cf.io_threadcount = settings.ext_io_threadcount;
 1132     cf->ext_cf.io_depth = 1;
 1133     cf->ext_cf.page_buckets = 4;
 1134     cf->ext_cf.wbuf_count = cf->ext_cf.page_buckets;
 1135 
 1136     return cf;
 1137 }
 1138 
 1139 // TODO: pass settings struct?
 1140 int storage_read_config(void *conf, char **subopt) {
 1141     struct storage_settings *cf = conf;
 1142     struct extstore_conf *ext_cf = &cf->ext_cf;
 1143     char *subopts_value;
 1144 
 1145     enum {
 1146         EXT_PAGE_SIZE,
 1147         EXT_WBUF_SIZE,
 1148         EXT_THREADS,
 1149         EXT_IO_DEPTH,
 1150         EXT_PATH,
 1151         EXT_ITEM_SIZE,
 1152         EXT_ITEM_AGE,
 1153         EXT_LOW_TTL,
 1154         EXT_RECACHE_RATE,
 1155         EXT_COMPACT_UNDER,
 1156         EXT_DROP_UNDER,
 1157         EXT_MAX_SLEEP,
 1158         EXT_MAX_FRAG,
 1159         EXT_DROP_UNREAD,
 1160         SLAB_AUTOMOVE_FREERATIO, // FIXME: move this back?
 1161     };
 1162 
 1163     char *const subopts_tokens[] = {
 1164         [EXT_PAGE_SIZE] = "ext_page_size",
 1165         [EXT_WBUF_SIZE] = "ext_wbuf_size",
 1166         [EXT_THREADS] = "ext_threads",
 1167         [EXT_IO_DEPTH] = "ext_io_depth",
 1168         [EXT_PATH] = "ext_path",
 1169         [EXT_ITEM_SIZE] = "ext_item_size",
 1170         [EXT_ITEM_AGE] = "ext_item_age",
 1171         [EXT_LOW_TTL] = "ext_low_ttl",
 1172         [EXT_RECACHE_RATE] = "ext_recache_rate",
 1173         [EXT_COMPACT_UNDER] = "ext_compact_under",
 1174         [EXT_DROP_UNDER] = "ext_drop_under",
 1175         [EXT_MAX_SLEEP] = "ext_max_sleep",
 1176         [EXT_MAX_FRAG] = "ext_max_frag",
 1177         [EXT_DROP_UNREAD] = "ext_drop_unread",
 1178         [SLAB_AUTOMOVE_FREERATIO] = "slab_automove_freeratio",
 1179         NULL
 1180     };
 1181 
 1182     switch (getsubopt(subopt, subopts_tokens, &subopts_value)) {
 1183         case EXT_PAGE_SIZE:
 1184             if (cf->storage_file) {
 1185                 fprintf(stderr, "Must specify ext_page_size before any ext_path arguments\n");
 1186                 return 1;
 1187             }
 1188             if (subopts_value == NULL) {
 1189                 fprintf(stderr, "Missing ext_page_size argument\n");
 1190                 return 1;
 1191             }
 1192             if (!safe_strtoul(subopts_value, &ext_cf->page_size)) {
 1193                 fprintf(stderr, "could not parse argument to ext_page_size\n");
 1194                 return 1;
 1195             }
 1196             ext_cf->page_size *= 1024 * 1024; /* megabytes */
 1197             break;
 1198         case EXT_WBUF_SIZE:
 1199             if (subopts_value == NULL) {
 1200                 fprintf(stderr, "Missing ext_wbuf_size argument\n");
 1201                 return 1;
 1202             }
 1203             if (!safe_strtoul(subopts_value, &ext_cf->wbuf_size)) {
 1204                 fprintf(stderr, "could not parse argument to ext_wbuf_size\n");
 1205                 return 1;
 1206             }
 1207             ext_cf->wbuf_size *= 1024 * 1024; /* megabytes */
 1208             settings.ext_wbuf_size = ext_cf->wbuf_size;
 1209             break;
 1210         case EXT_THREADS:
 1211             if (subopts_value == NULL) {
 1212                 fprintf(stderr, "Missing ext_threads argument\n");
 1213                 return 1;
 1214             }
 1215             if (!safe_strtoul(subopts_value, &ext_cf->io_threadcount)) {
 1216                 fprintf(stderr, "could not parse argument to ext_threads\n");
 1217                 return 1;
 1218             }
 1219             break;
 1220         case EXT_IO_DEPTH:
 1221             if (subopts_value == NULL) {
 1222                 fprintf(stderr, "Missing ext_io_depth argument\n");
 1223                 return 1;
 1224             }
 1225             if (!safe_strtoul(subopts_value, &ext_cf->io_depth)) {
 1226                 fprintf(stderr, "could not parse argument to ext_io_depth\n");
 1227                 return 1;
 1228             }
 1229             break;
 1230         case EXT_ITEM_SIZE:
 1231             if (subopts_value == NULL) {
 1232                 fprintf(stderr, "Missing ext_item_size argument\n");
 1233                 return 1;
 1234             }
 1235             if (!safe_strtoul(subopts_value, &settings.ext_item_size)) {
 1236                 fprintf(stderr, "could not parse argument to ext_item_size\n");
 1237                 return 1;
 1238             }
 1239             break;
 1240         case EXT_ITEM_AGE:
 1241             if (subopts_value == NULL) {
 1242                 fprintf(stderr, "Missing ext_item_age argument\n");
 1243                 return 1;
 1244             }
 1245             if (!safe_strtoul(subopts_value, &settings.ext_item_age)) {
 1246                 fprintf(stderr, "could not parse argument to ext_item_age\n");
 1247                 return 1;
 1248             }
 1249             break;
 1250         case EXT_LOW_TTL:
 1251             if (subopts_value == NULL) {
 1252                 fprintf(stderr, "Missing ext_low_ttl argument\n");
 1253                 return 1;
 1254             }
 1255             if (!safe_strtoul(subopts_value, &settings.ext_low_ttl)) {
 1256                 fprintf(stderr, "could not parse argument to ext_low_ttl\n");
 1257                 return 1;
 1258             }
 1259             break;
 1260         case EXT_RECACHE_RATE:
 1261             if (subopts_value == NULL) {
 1262                 fprintf(stderr, "Missing ext_recache_rate argument\n");
 1263                 return 1;
 1264             }
 1265             if (!safe_strtoul(subopts_value, &settings.ext_recache_rate)) {
 1266                 fprintf(stderr, "could not parse argument to ext_recache_rate\n");
 1267                 return 1;
 1268             }
 1269             break;
 1270         case EXT_COMPACT_UNDER:
 1271             if (subopts_value == NULL) {
 1272                 fprintf(stderr, "Missing ext_compact_under argument\n");
 1273                 return 1;
 1274             }
 1275             if (!safe_strtoul(subopts_value, &settings.ext_compact_under)) {
 1276                 fprintf(stderr, "could not parse argument to ext_compact_under\n");
 1277                 return 1;
 1278             }
 1279             break;
 1280         case EXT_DROP_UNDER:
 1281             if (subopts_value == NULL) {
 1282                 fprintf(stderr, "Missing ext_drop_under argument\n");
 1283                 return 1;
 1284             }
 1285             if (!safe_strtoul(subopts_value, &settings.ext_drop_under)) {
 1286                 fprintf(stderr, "could not parse argument to ext_drop_under\n");
 1287                 return 1;
 1288             }
 1289             break;
 1290         case EXT_MAX_SLEEP:
 1291             if (subopts_value == NULL) {
 1292                 fprintf(stderr, "Missing ext_max_sleep argument\n");
 1293                 return 1;
 1294             }
 1295             if (!safe_strtoul(subopts_value, &settings.ext_max_sleep)) {
 1296                 fprintf(stderr, "could not parse argument to ext_max_sleep\n");
 1297                 return 1;
 1298             }
 1299             break;
 1300         case EXT_MAX_FRAG:
 1301             if (subopts_value == NULL) {
 1302                 fprintf(stderr, "Missing ext_max_frag argument\n");
 1303                 return 1;
 1304             }
 1305             if (!safe_strtod(subopts_value, &settings.ext_max_frag)) {
 1306                 fprintf(stderr, "could not parse argument to ext_max_frag\n");
 1307                 return 1;
 1308             }
 1309             break;
 1310         case SLAB_AUTOMOVE_FREERATIO:
 1311             if (subopts_value == NULL) {
 1312                 fprintf(stderr, "Missing slab_automove_freeratio argument\n");
 1313                 return 1;
 1314             }
 1315             if (!safe_strtod(subopts_value, &settings.slab_automove_freeratio)) {
 1316                 fprintf(stderr, "could not parse argument to slab_automove_freeratio\n");
 1317                 return 1;
 1318             }
 1319             break;
 1320         case EXT_DROP_UNREAD:
 1321             settings.ext_drop_unread = true;
 1322             break;
 1323         case EXT_PATH:
 1324             if (subopts_value) {
 1325                 struct extstore_conf_file *tmp = storage_conf_parse(subopts_value, ext_cf->page_size);
 1326                 if (tmp == NULL) {
 1327                     fprintf(stderr, "failed to parse ext_path argument\n");
 1328                     return 1;
 1329                 }
 1330                 if (cf->storage_file != NULL) {
 1331                     tmp->next = cf->storage_file;
 1332                 }
 1333                 cf->storage_file = tmp;
 1334             } else {
 1335                 fprintf(stderr, "missing argument to ext_path, ie: ext_path=/d/file:5G\n");
 1336                 return 1;
 1337             }
 1338             break;
 1339         default:
 1340             fprintf(stderr, "Illegal suboption \"%s\"\n", subopts_value);
 1341             return 1;
 1342     }
 1343 
 1344     return 0;
 1345 }
 1346 
 1347 int storage_check_config(void *conf) {
 1348     struct storage_settings *cf = conf;
 1349     struct extstore_conf *ext_cf = &cf->ext_cf;
 1350 
 1351     if (cf->storage_file) {
 1352         if (settings.item_size_max > ext_cf->wbuf_size) {
 1353             fprintf(stderr, "-I (item_size_max: %d) cannot be larger than ext_wbuf_size: %d\n",
 1354                 settings.item_size_max, ext_cf->wbuf_size);
 1355             return 1;
 1356         }
 1357 
 1358         if (settings.udpport) {
 1359             fprintf(stderr, "Cannot use UDP with extstore enabled (-U 0 to disable)\n");
 1360             return 1;
 1361         }
 1362 
 1363         return 0;
 1364     }
 1365 
 1366     return 2;
 1367 }
 1368 
 1369 void *storage_init(void *conf) {
 1370     struct storage_settings *cf = conf;
 1371     struct extstore_conf *ext_cf = &cf->ext_cf;
 1372 
 1373     enum extstore_res eres;
 1374     void *storage = NULL;
 1375     if (settings.ext_compact_under == 0) {
 1376         // If changing the default fraction (4), change the help text as well.
 1377         settings.ext_compact_under = cf->storage_file->page_count / 4;
 1378         /* Only rescues non-COLD items if below this threshold */
 1379         settings.ext_drop_under = cf->storage_file->page_count / 4;
 1380     }
 1381     crc32c_init();
 1382     /* Init free chunks to zero. */
 1383     for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
 1384         settings.ext_free_memchunks[x] = 0;
 1385     }
 1386     storage = extstore_init(cf->storage_file, ext_cf, &eres);
 1387     if (storage == NULL) {
 1388         fprintf(stderr, "Failed to initialize external storage: %s\n",
 1389                 extstore_err(eres));
 1390         if (eres == EXTSTORE_INIT_OPEN_FAIL) {
 1391             perror("extstore open");
 1392         }
 1393         return NULL;
 1394     }
 1395 
 1396     return storage;
 1397 }
 1398 
 1399 #endif