"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.9/storage.c" (21 Nov 2020, 50095 Bytes) of package /linux/www/memcached-1.6.9.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "storage.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.6.8_vs_1.6.9.

    1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    2 #include "memcached.h"
    3 #ifdef EXTSTORE
    4 
    5 #include "storage.h"
    6 #include "extstore.h"
    7 #include <stdlib.h>
    8 #include <stdio.h>
    9 #include <stddef.h>
   10 #include <string.h>
   11 #include <limits.h>
   12 #include <ctype.h>
   13 
   14 #define PAGE_BUCKET_DEFAULT 0
   15 #define PAGE_BUCKET_COMPACT 1
   16 #define PAGE_BUCKET_CHUNKED 2
   17 #define PAGE_BUCKET_LOWTTL  3
   18 
   19 /*
   20  * API functions
   21  */
   22 
   23 // re-cast an io_pending_t into this more descriptive structure.
   24 // the first few items _must_ match the original struct.
   25 typedef struct _io_pending_storage_t {
   26     io_queue_t *q;
   27     conn *c;
   28     mc_resp *resp;            /* original struct ends here */
   29     item *hdr_it;             /* original header item. */
   30     obj_io io_ctx;            /* embedded extstore IO header */
   31     unsigned int iovec_data;  /* specific index of data iovec */
   32     bool noreply;             /* whether the response had noreply set */
   33     bool miss;                /* signal a miss to unlink hdr_it */
   34     bool badcrc;              /* signal a crc failure */
   35     bool active;              /* tells if IO was dispatched or not */
   36 } io_pending_storage_t;
   37 
   38 // Only call this if item has ITEM_HDR
   39 bool storage_validate_item(void *e, item *it) {
   40     item_hdr *hdr = (item_hdr *)ITEM_data(it);
   41     if (extstore_check(e, hdr->page_id, hdr->page_version) != 0) {
   42         return false;
   43     } else {
   44         return true;
   45     }
   46 }
   47 
   48 void storage_delete(void *e, item *it) {
   49     if (it->it_flags & ITEM_HDR) {
   50         item_hdr *hdr = (item_hdr *)ITEM_data(it);
   51         extstore_delete(e, hdr->page_id, hdr->page_version,
   52                 1, ITEM_ntotal(it));
   53     }
   54 }
   55 
   56 // Function for the extra stats called from a protocol.
   57 // NOTE: This either needs a name change or a wrapper, perhaps?
   58 // it's defined here to reduce exposure of extstore.h to the rest of memcached
   59 // but feels a little off being defined here.
   60 // At very least maybe "process_storage_stats" in line with making this more
   61 // of a generic wrapper module.
   62 void process_extstore_stats(ADD_STAT add_stats, conn *c) {
   63     int i;
   64     char key_str[STAT_KEY_LEN];
   65     char val_str[STAT_VAL_LEN];
   66     int klen = 0, vlen = 0;
   67     struct extstore_stats st;
   68 
   69     assert(add_stats);
   70 
   71     void *storage = c->thread->storage;
   72     extstore_get_stats(storage, &st);
   73     st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
   74     extstore_get_page_data(storage, &st);
   75 
   76     for (i = 0; i < st.page_count; i++) {
   77         APPEND_NUM_STAT(i, "version", "%llu",
   78                 (unsigned long long) st.page_data[i].version);
   79         APPEND_NUM_STAT(i, "bytes", "%llu",
   80                 (unsigned long long) st.page_data[i].bytes_used);
   81         APPEND_NUM_STAT(i, "bucket", "%u",
   82                 st.page_data[i].bucket);
   83         APPEND_NUM_STAT(i, "free_bucket", "%u",
   84                 st.page_data[i].free_bucket);
   85     }
   86 }
   87 
   88 // Additional storage stats for the main stats output.
   89 void storage_stats(ADD_STAT add_stats, conn *c) {
   90     struct extstore_stats st;
   91     if (c->thread->storage) {
   92         STATS_LOCK();
   93         APPEND_STAT("extstore_compact_lost", "%llu", (unsigned long long)stats.extstore_compact_lost);
   94         APPEND_STAT("extstore_compact_rescues", "%llu", (unsigned long long)stats.extstore_compact_rescues);
   95         APPEND_STAT("extstore_compact_skipped", "%llu", (unsigned long long)stats.extstore_compact_skipped);
   96         STATS_UNLOCK();
   97         extstore_get_stats(c->thread->storage, &st);
   98         APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_allocs);
   99         APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.page_evictions);
  100         APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.page_reclaims);
  101         APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_free);
  102         APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_used);
  103         APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.objects_evicted);
  104         APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.objects_read);
  105         APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.objects_written);
  106         APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.objects_used);
  107         APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.bytes_evicted);
  108         APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.bytes_written);
  109         APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_read);
  110         APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used);
  111         APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented);
  112         APPEND_STAT("extstore_limit_maxbytes", "%llu", (unsigned long long)(st.page_count * st.page_size));
  113         APPEND_STAT("extstore_io_queue", "%llu", (unsigned long long)(st.io_queue));
  114     }
  115 
  116 }
  117 
  118 
  119 // FIXME: This runs in the IO thread. to get better IO performance this should
  120 // simply mark the io wrapper with the return value and decrement wrapleft, if
  121 // zero redispatching. Still a bit of work being done in the side thread but
  122 // minimized at least.
  123 // TODO: wrap -> p?
  124 static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
  125     // FIXME: assumes success
  126     io_pending_storage_t *p = (io_pending_storage_t *)io->data;
  127     mc_resp *resp = p->resp;
  128     conn *c = p->c;
  129     assert(p->active == true);
  130     item *read_it = (item *)io->buf;
  131     bool miss = false;
  132 
  133     // TODO: How to do counters for hit/misses?
  134     if (ret < 1) {
  135         miss = true;
  136     } else {
  137         uint32_t crc2;
  138         uint32_t crc = (uint32_t) read_it->exptime;
  139         int x;
  140         // item is chunked, crc the iov's
  141         if (io->iov != NULL) {
  142             // first iov is the header, which we don't use beyond crc
  143             crc2 = crc32c(0, (char *)io->iov[0].iov_base+STORE_OFFSET, io->iov[0].iov_len-STORE_OFFSET);
  144             // make sure it's not sent. hack :(
  145             io->iov[0].iov_len = 0;
  146             for (x = 1; x < io->iovcnt; x++) {
  147                 crc2 = crc32c(crc2, (char *)io->iov[x].iov_base, io->iov[x].iov_len);
  148             }
  149         } else {
  150             crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, io->len-STORE_OFFSET);
  151         }
  152 
  153         if (crc != crc2) {
  154             miss = true;
  155             p->badcrc = true;
  156         }
  157     }
  158 
  159     if (miss) {
  160         if (p->noreply) {
  161             // In all GET cases, noreply means we send nothing back.
  162             resp->skip = true;
  163         } else {
  164             // TODO: This should be movable to the worker thread.
  165             // Convert the binprot response into a miss response.
  166             // The header requires knowing a bunch of stateful crap, so rather
  167             // than simply writing out a "new" miss response we mangle what's
  168             // already there.
  169             if (c->protocol == binary_prot) {
  170                 protocol_binary_response_header *header =
  171                     (protocol_binary_response_header *)resp->wbuf;
  172 
  173                 // cut the extra nbytes off of the body_len
  174                 uint32_t body_len = ntohl(header->response.bodylen);
  175                 uint8_t hdr_len = header->response.extlen;
  176                 body_len -= resp->iov[p->iovec_data].iov_len + hdr_len;
  177                 resp->tosend -= resp->iov[p->iovec_data].iov_len + hdr_len;
  178                 header->response.extlen = 0;
  179                 header->response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
  180                 header->response.bodylen = htonl(body_len);
  181 
  182                 // truncate the data response.
  183                 resp->iov[p->iovec_data].iov_len = 0;
  184                 // wipe the extlen iov... wish it was just a flat buffer.
  185                 resp->iov[p->iovec_data-1].iov_len = 0;
  186                 resp->chunked_data_iov = 0;
  187             } else {
  188                 int i;
  189                 // Meta commands have EN status lines for miss, rather than
  190                 // END as a trailer as per normal ascii.
  191                 if (resp->iov[0].iov_len >= 3
  192                         && memcmp(resp->iov[0].iov_base, "VA ", 3) == 0) {
  193                     // TODO: These miss translators should use specific callback
  194                     // functions attached to the io wrap. This is weird :(
  195                     resp->iovcnt = 1;
  196                     resp->iov[0].iov_len = 4;
  197                     resp->iov[0].iov_base = "EN\r\n";
  198                     resp->tosend = 4;
  199                 } else {
  200                     // Wipe the iovecs up through our data injection.
  201                     // Allows trailers to be returned (END)
  202                     for (i = 0; i <= p->iovec_data; i++) {
  203                         resp->tosend -= resp->iov[i].iov_len;
  204                         resp->iov[i].iov_len = 0;
  205                         resp->iov[i].iov_base = NULL;
  206                     }
  207                 }
  208                 resp->chunked_total = 0;
  209                 resp->chunked_data_iov = 0;
  210             }
  211         }
  212         p->miss = true;
  213     } else {
  214         assert(read_it->slabs_clsid != 0);
  215         // TODO: should always use it instead of ITEM_data to kill more
  216         // chunked special casing.
  217         if ((read_it->it_flags & ITEM_CHUNKED) == 0) {
  218             resp->iov[p->iovec_data].iov_base = ITEM_data(read_it);
  219         }
  220         p->miss = false;
  221     }
  222 
  223     p->q->count--;
  224     p->active = false;
  225     //assert(c->io_wrapleft >= 0);
  226 
  227     // All IO's have returned, lets re-attach this connection to our original
  228     // thread.
  229     if (p->q->count == 0) {
  230         redispatch_conn(c);
  231     }
  232 }
  233 
  234 int storage_get_item(conn *c, item *it, mc_resp *resp) {
  235 #ifdef NEED_ALIGN
  236     item_hdr hdr;
  237     memcpy(&hdr, ITEM_data(it), sizeof(hdr));
  238 #else
  239     item_hdr *hdr = (item_hdr *)ITEM_data(it);
  240 #endif
  241     io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_EXTSTORE);
  242     size_t ntotal = ITEM_ntotal(it);
  243     unsigned int clsid = slabs_clsid(ntotal);
  244     item *new_it;
  245     bool chunked = false;
  246     if (ntotal > settings.slab_chunk_size_max) {
  247         // Pull a chunked item header.
  248         uint32_t flags;
  249         FLAGS_CONV(it, flags);
  250         new_it = item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, it->nbytes);
  251         assert(new_it == NULL || (new_it->it_flags & ITEM_CHUNKED));
  252         chunked = true;
  253     } else {
  254         new_it = do_item_alloc_pull(ntotal, clsid);
  255     }
  256     if (new_it == NULL)
  257         return -1;
  258     // so we can free the chunk on a miss
  259     new_it->slabs_clsid = clsid;
  260 
  261     io_pending_storage_t *p = do_cache_alloc(c->thread->io_cache);
  262     // this is a re-cast structure, so assert that we never outsize it.
  263     assert(sizeof(io_pending_t) >= sizeof(io_pending_storage_t));
  264     memset(p, 0, sizeof(io_pending_storage_t));
  265     p->active = true;
  266     p->miss = false;
  267     p->badcrc = false;
  268     p->noreply = c->noreply;
  269     // io_pending owns the reference for this object now.
  270     p->hdr_it = it;
  271     p->resp = resp;
  272     p->q = q; // quicker access to the queue structure.
  273     obj_io *eio = &p->io_ctx;
  274 
  275     // FIXME: error handling.
  276     if (chunked) {
  277         unsigned int ciovcnt = 0;
  278         size_t remain = new_it->nbytes;
  279         item_chunk *chunk = (item_chunk *) ITEM_schunk(new_it);
  280         // TODO: This might make sense as a _global_ cache vs a per-thread.
  281         // but we still can't load objects requiring > IOV_MAX iovs.
  282         // In the meantime, these objects are rare/slow enough that
  283         // malloc/freeing a statically sized object won't cause us much pain.
  284         eio->iov = malloc(sizeof(struct iovec) * IOV_MAX);
  285         if (eio->iov == NULL) {
  286             item_remove(new_it);
  287             do_cache_free(c->thread->io_cache, p);
  288             return -1;
  289         }
  290 
  291         // fill the header so we can get the full data + crc back.
  292         eio->iov[0].iov_base = new_it;
  293         eio->iov[0].iov_len = ITEM_ntotal(new_it) - new_it->nbytes;
  294         ciovcnt++;
  295 
  296         while (remain > 0) {
  297             chunk = do_item_alloc_chunk(chunk, remain);
  298             // FIXME: _pure evil_, silently erroring if item is too large.
  299             if (chunk == NULL || ciovcnt > IOV_MAX-1) {
  300                 item_remove(new_it);
  301                 free(eio->iov);
  302                 // TODO: wrapper function for freeing up an io wrap?
  303                 eio->iov = NULL;
  304                 do_cache_free(c->thread->io_cache, p);
  305                 return -1;
  306             }
  307             eio->iov[ciovcnt].iov_base = chunk->data;
  308             eio->iov[ciovcnt].iov_len = (remain < chunk->size) ? remain : chunk->size;
  309             chunk->used = (remain < chunk->size) ? remain : chunk->size;
  310             remain -= chunk->size;
  311             ciovcnt++;
  312         }
  313 
  314         eio->iovcnt = ciovcnt;
  315     }
  316 
  317     // Chunked or non chunked we reserve a response iov here.
  318     p->iovec_data = resp->iovcnt;
  319     int iovtotal = (c->protocol == binary_prot) ? it->nbytes - 2 : it->nbytes;
  320     if (chunked) {
  321         resp_add_chunked_iov(resp, new_it, iovtotal);
  322     } else {
  323         resp_add_iov(resp, "", iovtotal);
  324     }
  325 
  326     // We can't bail out anymore, so mc_resp owns the IO from here.
  327     resp->io_pending = (io_pending_t *)p;
  328 
  329     eio->buf = (void *)new_it;
  330     p->c = c;
  331 
  332     // We need to stack the sub-struct IO's together for submission.
  333     eio->next = q->stack_ctx;
  334     q->stack_ctx = eio;
  335 
  336     // No need to stack the io_pending's together as they live on mc_resp's.
  337     assert(q->count >= 0);
  338     q->count++;
  339     // reference ourselves for the callback.
  340     eio->data = (void *)p;
  341 
  342     // Now, fill in io->io based on what was in our header.
  343 #ifdef NEED_ALIGN
  344     eio->page_version = hdr.page_version;
  345     eio->page_id = hdr.page_id;
  346     eio->offset = hdr.offset;
  347 #else
  348     eio->page_version = hdr->page_version;
  349     eio->page_id = hdr->page_id;
  350     eio->offset = hdr->offset;
  351 #endif
  352     eio->len = ntotal;
  353     eio->mode = OBJ_IO_READ;
  354     eio->cb = _storage_get_item_cb;
  355 
  356     // FIXME: This stat needs to move to reflect # of flash hits vs misses
  357     // for now it's a good gauge on how often we request out to flash at
  358     // least.
  359     pthread_mutex_lock(&c->thread->stats.mutex);
  360     c->thread->stats.get_extstore++;
  361     pthread_mutex_unlock(&c->thread->stats.mutex);
  362 
  363     return 0;
  364 }
  365 
  366 void storage_submit_cb(void *ctx, void *stack) {
  367     // Don't need to do anything special for extstore.
  368     extstore_submit(ctx, stack);
  369 }
  370 
  371 static void recache_or_free(io_pending_t *pending) {
  372     // re-cast to our specific struct.
  373     io_pending_storage_t *p = (io_pending_storage_t *)pending;
  374 
  375     conn *c = p->c;
  376     obj_io *io = &p->io_ctx;
  377     item *it = (item *)io->buf;
  378     assert(c != NULL);
  379     assert(io != NULL);
  380     bool do_free = true;
  381     if (p->active) {
  382         // If request never dispatched, free the read buffer but leave the
  383         // item header alone.
  384         do_free = false;
  385         size_t ntotal = ITEM_ntotal(p->hdr_it);
  386         slabs_free(it, ntotal, slabs_clsid(ntotal));
  387         p->q->count--;
  388         assert(p->q->count >= 0);
  389         pthread_mutex_lock(&c->thread->stats.mutex);
  390         c->thread->stats.get_aborted_extstore++;
  391         pthread_mutex_unlock(&c->thread->stats.mutex);
  392     } else if (p->miss) {
  393         // If request was ultimately a miss, unlink the header.
  394         do_free = false;
  395         size_t ntotal = ITEM_ntotal(p->hdr_it);
  396         item_unlink(p->hdr_it);
  397         slabs_free(it, ntotal, slabs_clsid(ntotal));
  398         pthread_mutex_lock(&c->thread->stats.mutex);
  399         c->thread->stats.miss_from_extstore++;
  400         if (p->badcrc)
  401             c->thread->stats.badcrc_from_extstore++;
  402         pthread_mutex_unlock(&c->thread->stats.mutex);
  403     } else if (settings.ext_recache_rate) {
  404         // hashvalue is cuddled during store
  405         uint32_t hv = (uint32_t)it->time;
  406         // opt to throw away rather than wait on a lock.
  407         void *hold_lock = item_trylock(hv);
  408         if (hold_lock != NULL) {
  409             item *h_it = p->hdr_it;
  410             uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE;
  411             // Item must be recently hit at least twice to recache.
  412             if (((h_it->it_flags & flags) == flags) &&
  413                     h_it->time > current_time - ITEM_UPDATE_INTERVAL &&
  414                     c->recache_counter++ % settings.ext_recache_rate == 0) {
  415                 do_free = false;
  416                 // In case it's been updated.
  417                 it->exptime = h_it->exptime;
  418                 it->it_flags &= ~ITEM_LINKED;
  419                 it->refcount = 0;
  420                 it->h_next = NULL; // might not be necessary.
  421                 STORAGE_delete(c->thread->storage, h_it);
  422                 item_replace(h_it, it, hv);
  423                 pthread_mutex_lock(&c->thread->stats.mutex);
  424                 c->thread->stats.recache_from_extstore++;
  425                 pthread_mutex_unlock(&c->thread->stats.mutex);
  426             }
  427         }
  428         if (hold_lock)
  429             item_trylock_unlock(hold_lock);
  430     }
  431     if (do_free)
  432         slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it));
  433 
  434     p->io_ctx.buf = NULL;
  435     p->io_ctx.next = NULL;
  436     p->active = false;
  437 
  438     // TODO: reuse lock and/or hv.
  439     item_remove(p->hdr_it);
  440 }
  441 
  442 // Called after the IO is processed but before the response is transmitted.
  443 // TODO: stubbed with a reminder: should be able to move most of the extstore
  444 // callback code into this code instead, executing on worker thread instead of
  445 // IO thread.
  446 void storage_complete_cb(void *ctx, void *stack_ctx) {
  447     return;
  448 }
  449 
  450 // Called after responses have been transmitted. Need to free up related data.
  451 void storage_finalize_cb(io_pending_t *pending) {
  452     recache_or_free(pending);
  453     io_pending_storage_t *p = (io_pending_storage_t *)pending;
  454     obj_io *io = &p->io_ctx;
  455     // malloc'ed iovec list used for chunked extstore fetches.
  456     if (io->iov) {
  457         free(io->iov);
  458         io->iov = NULL;
  459     }
  460     // don't need to free the main context, since it's embedded.
  461 }
  462 
  463 /*
  464  * WRITE FLUSH THREAD
  465  */
  466 
  467 static int storage_write(void *storage, const int clsid, const int item_age) {
  468     int did_moves = 0;
  469     struct lru_pull_tail_return it_info;
  470 
  471     it_info.it = NULL;
  472     lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info);
  473     /* Item is locked, and we have a reference to it. */
  474     if (it_info.it == NULL) {
  475         return did_moves;
  476     }
  477 
  478     obj_io io;
  479     item *it = it_info.it;
  480     /* First, storage for the header object */
  481     size_t orig_ntotal = ITEM_ntotal(it);
  482     uint32_t flags;
  483     if ((it->it_flags & ITEM_HDR) == 0 &&
  484             (item_age == 0 || current_time - it->time > item_age)) {
  485         FLAGS_CONV(it, flags);
  486         item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr));
  487         /* Run the storage write understanding the start of the item is dirty.
  488          * We will fill it (time/exptime/etc) from the header item on read.
  489          */
  490         if (hdr_it != NULL) {
  491             int bucket = (it->it_flags & ITEM_CHUNKED) ?
  492                 PAGE_BUCKET_CHUNKED : PAGE_BUCKET_DEFAULT;
  493             // Compress soon to expire items into similar pages.
  494             if (it->exptime - current_time < settings.ext_low_ttl) {
  495                 bucket = PAGE_BUCKET_LOWTTL;
  496             }
  497             hdr_it->it_flags |= ITEM_HDR;
  498             io.len = orig_ntotal;
  499             io.mode = OBJ_IO_WRITE;
  500             // NOTE: when the item is read back in, the slab mover
  501             // may see it. Important to have refcount>=2 or ~ITEM_LINKED
  502             assert(it->refcount >= 2);
  503             // NOTE: write bucket vs free page bucket will disambiguate once
  504             // lowttl feature is better understood.
  505             if (extstore_write_request(storage, bucket, bucket, &io) == 0) {
  506                 // cuddle the hash value into the time field so we don't have
  507                 // to recalculate it.
  508                 item *buf_it = (item *) io.buf;
  509                 buf_it->time = it_info.hv;
  510                 // copy from past the headers + time headers.
  511                 // TODO: should be in items.c
  512                 if (it->it_flags & ITEM_CHUNKED) {
  513                     // Need to loop through the item and copy
  514                     item_chunk *sch = (item_chunk *) ITEM_schunk(it);
  515                     int remain = orig_ntotal;
  516                     int copied = 0;
  517                     // copy original header
  518                     int hdrtotal = ITEM_ntotal(it) - it->nbytes;
  519                     memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, hdrtotal - STORE_OFFSET);
  520                     copied = hdrtotal;
  521                     // copy data in like it were one large object.
  522                     while (sch && remain) {
  523                         assert(remain >= sch->used);
  524                         memcpy((char *)io.buf+copied, sch->data, sch->used);
  525                         // FIXME: use one variable?
  526                         remain -= sch->used;
  527                         copied += sch->used;
  528                         sch = sch->next;
  529                     }
  530                 } else {
  531                     memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, io.len-STORE_OFFSET);
  532                 }
  533                 // crc what we copied so we can do it sequentially.
  534                 buf_it->it_flags &= ~ITEM_LINKED;
  535                 buf_it->exptime = crc32c(0, (char*)io.buf+STORE_OFFSET, orig_ntotal-STORE_OFFSET);
  536                 extstore_write(storage, &io);
  537                 item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it);
  538                 hdr->page_version = io.page_version;
  539                 hdr->page_id = io.page_id;
  540                 hdr->offset  = io.offset;
  541                 // overload nbytes for the header it
  542                 hdr_it->nbytes = it->nbytes;
  543                 /* success! Now we need to fill relevant data into the new
  544                  * header and replace. Most of this requires the item lock
  545                  */
  546                 /* CAS gets set while linking. Copy post-replace */
  547                 item_replace(it, hdr_it, it_info.hv);
  548                 ITEM_set_cas(hdr_it, ITEM_get_cas(it));
  549                 do_item_remove(hdr_it);
  550                 did_moves = 1;
  551                 LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EXTSTORE_WRITE, it, bucket);
  552             } else {
  553                 /* Failed to write for some reason, can't continue. */
  554                 slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it));
  555             }
  556         }
  557     }
  558     do_item_remove(it);
  559     item_unlock(it_info.hv);
  560     return did_moves;
  561 }
  562 
  563 static pthread_t storage_write_tid;
  564 static pthread_mutex_t storage_write_plock;
  565 #define WRITE_SLEEP_MAX 1000000
  566 #define WRITE_SLEEP_MIN 500
  567 
  568 static void *storage_write_thread(void *arg) {
  569     void *storage = arg;
  570     // NOTE: ignoring overflow since that would take years of uptime in a
  571     // specific load pattern of never going to sleep.
  572     unsigned int backoff[MAX_NUMBER_OF_SLAB_CLASSES] = {0};
  573     unsigned int counter = 0;
  574     useconds_t to_sleep = WRITE_SLEEP_MIN;
  575     logger *l = logger_create();
  576     if (l == NULL) {
  577         fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
  578         abort();
  579     }
  580 
  581     pthread_mutex_lock(&storage_write_plock);
  582 
  583     while (1) {
  584         // cache per-loop to avoid calls to the slabs_clsid() search loop
  585         int min_class = slabs_clsid(settings.ext_item_size);
  586         bool do_sleep = true;
  587         counter++;
  588         if (to_sleep > WRITE_SLEEP_MAX)
  589             to_sleep = WRITE_SLEEP_MAX;
  590 
  591         for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
  592             bool did_move = false;
  593             bool mem_limit_reached = false;
  594             unsigned int chunks_free;
  595             int item_age;
  596             int target = settings.ext_free_memchunks[x];
  597             if (min_class > x || (backoff[x] && (counter % backoff[x] != 0))) {
  598                 // Long sleeps means we should retry classes sooner.
  599                 if (to_sleep > WRITE_SLEEP_MIN * 10)
  600                     backoff[x] /= 2;
  601                 continue;
  602             }
  603 
  604             // Avoid extra slab lock calls during heavy writing.
  605             chunks_free = slabs_available_chunks(x, &mem_limit_reached,
  606                     NULL);
  607 
  608             // storage_write() will fail and cut loop after filling write buffer.
  609             while (1) {
  610                 // if we are low on chunks and no spare, push out early.
  611                 if (chunks_free < target && mem_limit_reached) {
  612                     item_age = 0;
  613                 } else {
  614                     item_age = settings.ext_item_age;
  615                 }
  616                 if (storage_write(storage, x, item_age)) {
  617                     chunks_free++; // Allow stopping if we've done enough this loop
  618                     did_move = true;
  619                     do_sleep = false;
  620                     if (to_sleep > WRITE_SLEEP_MIN)
  621                         to_sleep /= 2;
  622                 } else {
  623                     break;
  624                 }
  625             }
  626 
  627             if (!did_move) {
  628                 backoff[x]++;
  629             } else if (backoff[x]) {
  630                 backoff[x] /= 2;
  631             }
  632         }
  633 
  634         // flip lock so we can be paused or stopped
  635         pthread_mutex_unlock(&storage_write_plock);
  636         if (do_sleep) {
  637             usleep(to_sleep);
  638             to_sleep *= 2;
  639         }
  640         pthread_mutex_lock(&storage_write_plock);
  641     }
  642     return NULL;
  643 }
  644 
  645 // TODO
  646 // logger needs logger_destroy() to exist/work before this is safe.
  647 /*int stop_storage_write_thread(void) {
  648     int ret;
  649     pthread_mutex_lock(&lru_maintainer_lock);
  650     do_run_lru_maintainer_thread = 0;
  651     pthread_mutex_unlock(&lru_maintainer_lock);
  652     // WAKEUP SIGNAL
  653     if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
  654         fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
  655         return -1;
  656     }
  657     settings.lru_maintainer_thread = false;
  658     return 0;
  659 }*/
  660 
  661 void storage_write_pause(void) {
  662     pthread_mutex_lock(&storage_write_plock);
  663 }
  664 
  665 void storage_write_resume(void) {
  666     pthread_mutex_unlock(&storage_write_plock);
  667 }
  668 
  669 int start_storage_write_thread(void *arg) {
  670     int ret;
  671 
  672     pthread_mutex_init(&storage_write_plock, NULL);
  673     if ((ret = pthread_create(&storage_write_tid, NULL,
  674         storage_write_thread, arg)) != 0) {
  675         fprintf(stderr, "Can't create storage_write thread: %s\n",
  676             strerror(ret));
  677         return -1;
  678     }
  679 
  680     return 0;
  681 }
  682 
  683 /*** COMPACTOR ***/
  684 
  685 /* Fetch stats from the external storage system and decide to compact.
  686  * If we're more than half full, start skewing how aggressively to run
  687  * compaction, up to a desired target when all pages are full.
  688  */
  689 static int storage_compact_check(void *storage, logger *l,
  690         uint32_t *page_id, uint64_t *page_version,
  691         uint64_t *page_size, bool *drop_unread) {
  692     struct extstore_stats st;
  693     int x;
  694     double rate;
  695     uint64_t frag_limit;
  696     uint64_t low_version = ULLONG_MAX;
  697     uint64_t lowest_version = ULLONG_MAX;
  698     unsigned int low_page = 0;
  699     unsigned int lowest_page = 0;
  700     extstore_get_stats(storage, &st);
  701     if (st.pages_used == 0)
  702         return 0;
  703 
  704     // lets pick a target "wasted" value and slew.
  705     if (st.pages_free > settings.ext_compact_under)
  706         return 0;
  707     *drop_unread = false;
  708 
  709     // the number of free pages reduces the configured frag limit
  710     // this allows us to defrag early if pages are very empty.
  711     rate = 1.0 - ((double)st.pages_free / st.page_count);
  712     rate *= settings.ext_max_frag;
  713     frag_limit = st.page_size * rate;
  714     LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO,
  715             NULL, rate, frag_limit);
  716     st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
  717     extstore_get_page_data(storage, &st);
  718 
  719     // find oldest page by version that violates the constraint
  720     for (x = 0; x < st.page_count; x++) {
  721         if (st.page_data[x].version == 0 ||
  722             st.page_data[x].bucket == PAGE_BUCKET_LOWTTL)
  723             continue;
  724         if (st.page_data[x].version < lowest_version) {
  725             lowest_page = x;
  726             lowest_version = st.page_data[x].version;
  727         }
  728         if (st.page_data[x].bytes_used < frag_limit) {
  729             if (st.page_data[x].version < low_version) {
  730                 low_page = x;
  731                 low_version = st.page_data[x].version;
  732             }
  733         }
  734     }
  735     *page_size = st.page_size;
  736     free(st.page_data);
  737 
  738     // we have a page + version to attempt to reclaim.
  739     if (low_version != ULLONG_MAX) {
  740         *page_id = low_page;
  741         *page_version = low_version;
  742         return 1;
  743     } else if (lowest_version != ULLONG_MAX && settings.ext_drop_unread
  744             && st.pages_free <= settings.ext_drop_under) {
  745         // nothing matched the frag rate barrier, so pick the absolute oldest
  746         // version if we're configured to drop items.
  747         *page_id = lowest_page;
  748         *page_version = lowest_version;
  749         *drop_unread = true;
  750         return 1;
  751     }
  752 
  753     return 0;
  754 }
  755 
  756 static pthread_t storage_compact_tid;
  757 static pthread_mutex_t storage_compact_plock;
  758 #define MIN_STORAGE_COMPACT_SLEEP 10000
  759 #define MAX_STORAGE_COMPACT_SLEEP 2000000
  760 
  761 struct storage_compact_wrap {
  762     obj_io io;
  763     pthread_mutex_t lock; // gates the bools.
  764     bool done;
  765     bool submitted;
  766     bool miss; // version flipped out from under us
  767 };
  768 
  769 static void storage_compact_readback(void *storage, logger *l,
  770         bool drop_unread, char *readback_buf,
  771         uint32_t page_id, uint64_t page_version, uint64_t read_size) {
  772     uint64_t offset = 0;
  773     unsigned int rescues = 0;
  774     unsigned int lost = 0;
  775     unsigned int skipped = 0;
  776 
  777     while (offset < read_size) {
  778         item *hdr_it = NULL;
  779         item_hdr *hdr = NULL;
  780         item *it = (item *)(readback_buf+offset);
  781         unsigned int ntotal;
  782         // probably zeroed out junk at the end of the wbuf
  783         if (it->nkey == 0) {
  784             break;
  785         }
  786 
  787         ntotal = ITEM_ntotal(it);
  788         uint32_t hv = (uint32_t)it->time;
  789         item_lock(hv);
  790         // We don't have a conn and don't need to do most of do_item_get
  791         hdr_it = assoc_find(ITEM_key(it), it->nkey, hv);
  792         if (hdr_it != NULL) {
  793             bool do_write = false;
  794             refcount_incr(hdr_it);
  795 
  796             // Check validity but don't bother removing it.
  797             if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) &&
  798                    (hdr_it->exptime == 0 || hdr_it->exptime > current_time)) {
  799                 hdr = (item_hdr *)ITEM_data(hdr_it);
  800                 if (hdr->page_id == page_id && hdr->page_version == page_version) {
  801                     // Item header is still completely valid.
  802                     extstore_delete(storage, page_id, page_version, 1, ntotal);
  803                     // drop inactive items.
  804                     if (drop_unread && GET_LRU(hdr_it->slabs_clsid) == COLD_LRU) {
  805                         do_write = false;
  806                         skipped++;
  807                     } else {
  808                         do_write = true;
  809                     }
  810                 }
  811             }
  812 
  813             if (do_write) {
  814                 bool do_update = false;
  815                 int tries;
  816                 obj_io io;
  817                 io.len = ntotal;
  818                 io.mode = OBJ_IO_WRITE;
  819                 for (tries = 10; tries > 0; tries--) {
  820                     if (extstore_write_request(storage, PAGE_BUCKET_COMPACT, PAGE_BUCKET_COMPACT, &io) == 0) {
  821                         memcpy(io.buf, it, io.len);
  822                         extstore_write(storage, &io);
  823                         do_update = true;
  824                         break;
  825                     } else {
  826                         usleep(1000);
  827                     }
  828                 }
  829 
  830                 if (do_update) {
  831                     if (it->refcount == 2) {
  832                         hdr->page_version = io.page_version;
  833                         hdr->page_id = io.page_id;
  834                         hdr->offset = io.offset;
  835                         rescues++;
  836                     } else {
  837                         lost++;
  838                         // TODO: re-alloc and replace header.
  839                     }
  840                 } else {
  841                     lost++;
  842                 }
  843             }
  844 
  845             do_item_remove(hdr_it);
  846         }
  847 
  848         item_unlock(hv);
  849         offset += ntotal;
  850         if (read_size - offset < sizeof(struct _stritem))
  851             break;
  852     }
  853 
  854     STATS_LOCK();
  855     stats.extstore_compact_lost += lost;
  856     stats.extstore_compact_rescues += rescues;
  857     stats.extstore_compact_skipped += skipped;
  858     STATS_UNLOCK();
  859     LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END,
  860             NULL, page_id, offset, rescues, lost, skipped);
  861 }
  862 
  863 static void _storage_compact_cb(void *e, obj_io *io, int ret) {
  864     struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data;
  865     assert(wrap->submitted == true);
  866 
  867     pthread_mutex_lock(&wrap->lock);
  868 
  869     if (ret < 1) {
  870         wrap->miss = true;
  871     }
  872     wrap->done = true;
  873 
  874     pthread_mutex_unlock(&wrap->lock);
  875 }
  876 
  877 // TODO: hoist the storage bits from lru_maintainer_thread in here.
  878 // would be nice if they could avoid hammering the same locks though?
  879 // I guess it's only COLD. that's probably fine.
  880 static void *storage_compact_thread(void *arg) {
  881     void *storage = arg;
  882     useconds_t to_sleep = MAX_STORAGE_COMPACT_SLEEP;
  883     bool compacting = false;
  884     uint64_t page_version = 0;
  885     uint64_t page_size = 0;
  886     uint64_t page_offset = 0;
  887     uint32_t page_id = 0;
  888     bool drop_unread = false;
  889     char *readback_buf = NULL;
  890     struct storage_compact_wrap wrap;
  891 
  892     logger *l = logger_create();
  893     if (l == NULL) {
  894         fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
  895         abort();
  896     }
  897 
  898     readback_buf = malloc(settings.ext_wbuf_size);
  899     if (readback_buf == NULL) {
  900         fprintf(stderr, "Failed to allocate readback buffer for storage compaction thread\n");
  901         abort();
  902     }
  903 
  904     pthread_mutex_init(&wrap.lock, NULL);
  905     wrap.done = false;
  906     wrap.submitted = false;
  907     wrap.io.data = &wrap;
  908     wrap.io.iov = NULL;
  909     wrap.io.buf = (void *)readback_buf;
  910 
  911     wrap.io.len = settings.ext_wbuf_size;
  912     wrap.io.mode = OBJ_IO_READ;
  913     wrap.io.cb = _storage_compact_cb;
  914     pthread_mutex_lock(&storage_compact_plock);
  915 
  916     while (1) {
  917         pthread_mutex_unlock(&storage_compact_plock);
  918         if (to_sleep) {
  919             extstore_run_maint(storage);
  920             usleep(to_sleep);
  921         }
  922         pthread_mutex_lock(&storage_compact_plock);
  923 
  924         if (!compacting && storage_compact_check(storage, l,
  925                     &page_id, &page_version, &page_size, &drop_unread)) {
  926             page_offset = 0;
  927             compacting = true;
  928             LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START,
  929                     NULL, page_id, page_version);
  930         }
  931 
  932         if (compacting) {
  933             pthread_mutex_lock(&wrap.lock);
  934             if (page_offset < page_size && !wrap.done && !wrap.submitted) {
  935                 wrap.io.page_version = page_version;
  936                 wrap.io.page_id = page_id;
  937                 wrap.io.offset = page_offset;
  938                 // FIXME: should be smarter about io->next (unlink at use?)
  939                 wrap.io.next = NULL;
  940                 wrap.submitted = true;
  941                 wrap.miss = false;
  942 
  943                 extstore_submit(storage, &wrap.io);
  944             } else if (wrap.miss) {
  945                 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT,
  946                         NULL, page_id);
  947                 wrap.done = false;
  948                 wrap.submitted = false;
  949                 compacting = false;
  950             } else if (wrap.submitted && wrap.done) {
  951                 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START,
  952                         NULL, page_id, page_offset);
  953                 storage_compact_readback(storage, l, drop_unread,
  954                         readback_buf, page_id, page_version, settings.ext_wbuf_size);
  955                 page_offset += settings.ext_wbuf_size;
  956                 wrap.done = false;
  957                 wrap.submitted = false;
  958             } else if (page_offset >= page_size) {
  959                 compacting = false;
  960                 wrap.done = false;
  961                 wrap.submitted = false;
  962                 extstore_close_page(storage, page_id, page_version);
  963                 LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END,
  964                         NULL, page_id);
  965             }
  966             pthread_mutex_unlock(&wrap.lock);
  967 
  968             if (to_sleep > MIN_STORAGE_COMPACT_SLEEP)
  969                 to_sleep /= 2;
  970         } else {
  971             if (to_sleep < MAX_STORAGE_COMPACT_SLEEP)
  972                 to_sleep += MIN_STORAGE_COMPACT_SLEEP;
  973         }
  974     }
  975     free(readback_buf);
  976 
  977     return NULL;
  978 }
  979 
  980 // TODO
  981 // logger needs logger_destroy() to exist/work before this is safe.
  982 /*int stop_storage_compact_thread(void) {
  983     int ret;
  984     pthread_mutex_lock(&lru_maintainer_lock);
  985     do_run_lru_maintainer_thread = 0;
  986     pthread_mutex_unlock(&lru_maintainer_lock);
  987     if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
  988         fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
  989         return -1;
  990     }
  991     settings.lru_maintainer_thread = false;
  992     return 0;
  993 }*/
  994 
  995 void storage_compact_pause(void) {
  996     pthread_mutex_lock(&storage_compact_plock);
  997 }
  998 
  999 void storage_compact_resume(void) {
 1000     pthread_mutex_unlock(&storage_compact_plock);
 1001 }
 1002 
 1003 int start_storage_compact_thread(void *arg) {
 1004     int ret;
 1005 
 1006     pthread_mutex_init(&storage_compact_plock, NULL);
 1007     if ((ret = pthread_create(&storage_compact_tid, NULL,
 1008         storage_compact_thread, arg)) != 0) {
 1009         fprintf(stderr, "Can't create storage_compact thread: %s\n",
 1010             strerror(ret));
 1011         return -1;
 1012     }
 1013 
 1014     return 0;
 1015 }
 1016 
 1017 /*** UTILITY ***/
 1018 // /path/to/file:100G:bucket1
 1019 // FIXME: Modifies argument. copy instead?
 1020 struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size) {
 1021     struct extstore_conf_file *cf = NULL;
 1022     char *b = NULL;
 1023     char *p = strtok_r(arg, ":", &b);
 1024     char unit = 0;
 1025     uint64_t multiplier = 0;
 1026     int base_size = 0;
 1027     if (p == NULL)
 1028         goto error;
 1029     // First arg is the filepath.
 1030     cf = calloc(1, sizeof(struct extstore_conf_file));
 1031     cf->file = strdup(p);
 1032 
 1033     p = strtok_r(NULL, ":", &b);
 1034     if (p == NULL) {
 1035         fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n");
 1036         goto error;
 1037     }
 1038     unit = tolower(p[strlen(p)-1]);
 1039     p[strlen(p)-1] = '\0';
 1040     // sigh.
 1041     switch (unit) {
 1042         case 'm':
 1043             multiplier = 1024 * 1024;
 1044             break;
 1045         case 'g':
 1046             multiplier = 1024 * 1024 * 1024;
 1047             break;
 1048         case 't':
 1049             multiplier = 1024 * 1024;
 1050             multiplier *= 1024 * 1024;
 1051             break;
 1052         case 'p':
 1053             multiplier = 1024 * 1024;
 1054             multiplier *= 1024 * 1024 * 1024;
 1055             break;
 1056     }
 1057     base_size = atoi(p);
 1058     multiplier *= base_size;
 1059     // page_count is nearest-but-not-larger-than pages * psize
 1060     cf->page_count = multiplier / page_size;
 1061     assert(page_size * cf->page_count <= multiplier);
 1062 
 1063     // final token would be a default free bucket
 1064     p = strtok_r(NULL, ",", &b);
 1065     // TODO: We reuse the original DEFINES for now,
 1066     // but if lowttl gets split up this needs to be its own set.
 1067     if (p != NULL) {
 1068         if (strcmp(p, "compact") == 0) {
 1069             cf->free_bucket = PAGE_BUCKET_COMPACT;
 1070         } else if (strcmp(p, "lowttl") == 0) {
 1071             cf->free_bucket = PAGE_BUCKET_LOWTTL;
 1072         } else if (strcmp(p, "chunked") == 0) {
 1073             cf->free_bucket = PAGE_BUCKET_CHUNKED;
 1074         } else if (strcmp(p, "default") == 0) {
 1075             cf->free_bucket = PAGE_BUCKET_DEFAULT;
 1076         } else {
 1077             fprintf(stderr, "Unknown extstore bucket: %s\n", p);
 1078             goto error;
 1079         }
 1080     } else {
 1081         // TODO: is this necessary?
 1082         cf->free_bucket = PAGE_BUCKET_DEFAULT;
 1083     }
 1084 
 1085     // TODO: disabling until compact algorithm is improved.
 1086     if (cf->free_bucket != PAGE_BUCKET_DEFAULT) {
 1087         fprintf(stderr, "ext_path only presently supports the default bucket\n");
 1088         goto error;
 1089     }
 1090 
 1091     return cf;
 1092 error:
 1093     if (cf) {
 1094         if (cf->file)
 1095             free(cf->file);
 1096         free(cf);
 1097     }
 1098     return NULL;
 1099 }
 1100 
 1101 struct storage_settings {
 1102     struct extstore_conf_file *storage_file;
 1103     struct extstore_conf ext_cf;
 1104 };
 1105 
 1106 void *storage_init_config(struct settings *s) {
 1107     struct storage_settings *cf = calloc(1, sizeof(struct storage_settings));
 1108 
 1109     s->ext_item_size = 512;
 1110     s->ext_item_age = UINT_MAX;
 1111     s->ext_low_ttl = 0;
 1112     s->ext_recache_rate = 2000;
 1113     s->ext_max_frag = 0.8;
 1114     s->ext_drop_unread = false;
 1115     s->ext_wbuf_size = 1024 * 1024 * 4;
 1116     s->ext_compact_under = 0;
 1117     s->ext_drop_under = 0;
 1118     s->slab_automove_freeratio = 0.01;
 1119     s->ext_page_size = 1024 * 1024 * 64;
 1120     s->ext_io_threadcount = 1;
 1121     cf->ext_cf.page_size = settings.ext_page_size;
 1122     cf->ext_cf.wbuf_size = settings.ext_wbuf_size;
 1123     cf->ext_cf.io_threadcount = settings.ext_io_threadcount;
 1124     cf->ext_cf.io_depth = 1;
 1125     cf->ext_cf.page_buckets = 4;
 1126     cf->ext_cf.wbuf_count = cf->ext_cf.page_buckets;
 1127 
 1128     return cf;
 1129 }
 1130 
 1131 // TODO: pass settings struct?
 1132 int storage_read_config(void *conf, char **subopt) {
 1133     struct storage_settings *cf = conf;
 1134     struct extstore_conf *ext_cf = &cf->ext_cf;
 1135     char *subopts_value;
 1136 
 1137     enum {
 1138         EXT_PAGE_SIZE,
 1139         EXT_WBUF_SIZE,
 1140         EXT_THREADS,
 1141         EXT_IO_DEPTH,
 1142         EXT_PATH,
 1143         EXT_ITEM_SIZE,
 1144         EXT_ITEM_AGE,
 1145         EXT_LOW_TTL,
 1146         EXT_RECACHE_RATE,
 1147         EXT_COMPACT_UNDER,
 1148         EXT_DROP_UNDER,
 1149         EXT_MAX_FRAG,
 1150         EXT_DROP_UNREAD,
 1151         SLAB_AUTOMOVE_FREERATIO, // FIXME: move this back?
 1152     };
 1153 
 1154     char *const subopts_tokens[] = {
 1155         [EXT_PAGE_SIZE] = "ext_page_size",
 1156         [EXT_WBUF_SIZE] = "ext_wbuf_size",
 1157         [EXT_THREADS] = "ext_threads",
 1158         [EXT_IO_DEPTH] = "ext_io_depth",
 1159         [EXT_PATH] = "ext_path",
 1160         [EXT_ITEM_SIZE] = "ext_item_size",
 1161         [EXT_ITEM_AGE] = "ext_item_age",
 1162         [EXT_LOW_TTL] = "ext_low_ttl",
 1163         [EXT_RECACHE_RATE] = "ext_recache_rate",
 1164         [EXT_COMPACT_UNDER] = "ext_compact_under",
 1165         [EXT_DROP_UNDER] = "ext_drop_under",
 1166         [EXT_MAX_FRAG] = "ext_max_frag",
 1167         [EXT_DROP_UNREAD] = "ext_drop_unread",
 1168         [SLAB_AUTOMOVE_FREERATIO] = "slab_automove_freeratio",
 1169         NULL
 1170     };
 1171 
 1172     switch (getsubopt(subopt, subopts_tokens, &subopts_value)) {
 1173         case EXT_PAGE_SIZE:
 1174             if (cf->storage_file) {
 1175                 fprintf(stderr, "Must specify ext_page_size before any ext_path arguments\n");
 1176                 return 1;
 1177             }
 1178             if (subopts_value == NULL) {
 1179                 fprintf(stderr, "Missing ext_page_size argument\n");
 1180                 return 1;
 1181             }
 1182             if (!safe_strtoul(subopts_value, &ext_cf->page_size)) {
 1183                 fprintf(stderr, "could not parse argument to ext_page_size\n");
 1184                 return 1;
 1185             }
 1186             ext_cf->page_size *= 1024 * 1024; /* megabytes */
 1187             break;
 1188         case EXT_WBUF_SIZE:
 1189             if (subopts_value == NULL) {
 1190                 fprintf(stderr, "Missing ext_wbuf_size argument\n");
 1191                 return 1;
 1192             }
 1193             if (!safe_strtoul(subopts_value, &ext_cf->wbuf_size)) {
 1194                 fprintf(stderr, "could not parse argument to ext_wbuf_size\n");
 1195                 return 1;
 1196             }
 1197             ext_cf->wbuf_size *= 1024 * 1024; /* megabytes */
 1198             settings.ext_wbuf_size = ext_cf->wbuf_size;
 1199             break;
 1200         case EXT_THREADS:
 1201             if (subopts_value == NULL) {
 1202                 fprintf(stderr, "Missing ext_threads argument\n");
 1203                 return 1;
 1204             }
 1205             if (!safe_strtoul(subopts_value, &ext_cf->io_threadcount)) {
 1206                 fprintf(stderr, "could not parse argument to ext_threads\n");
 1207                 return 1;
 1208             }
 1209             break;
 1210         case EXT_IO_DEPTH:
 1211             if (subopts_value == NULL) {
 1212                 fprintf(stderr, "Missing ext_io_depth argument\n");
 1213                 return 1;
 1214             }
 1215             if (!safe_strtoul(subopts_value, &ext_cf->io_depth)) {
 1216                 fprintf(stderr, "could not parse argument to ext_io_depth\n");
 1217                 return 1;
 1218             }
 1219             break;
 1220         case EXT_ITEM_SIZE:
 1221             if (subopts_value == NULL) {
 1222                 fprintf(stderr, "Missing ext_item_size argument\n");
 1223                 return 1;
 1224             }
 1225             if (!safe_strtoul(subopts_value, &settings.ext_item_size)) {
 1226                 fprintf(stderr, "could not parse argument to ext_item_size\n");
 1227                 return 1;
 1228             }
 1229             break;
 1230         case EXT_ITEM_AGE:
 1231             if (subopts_value == NULL) {
 1232                 fprintf(stderr, "Missing ext_item_age argument\n");
 1233                 return 1;
 1234             }
 1235             if (!safe_strtoul(subopts_value, &settings.ext_item_age)) {
 1236                 fprintf(stderr, "could not parse argument to ext_item_age\n");
 1237                 return 1;
 1238             }
 1239             break;
 1240         case EXT_LOW_TTL:
 1241             if (subopts_value == NULL) {
 1242                 fprintf(stderr, "Missing ext_low_ttl argument\n");
 1243                 return 1;
 1244             }
 1245             if (!safe_strtoul(subopts_value, &settings.ext_low_ttl)) {
 1246                 fprintf(stderr, "could not parse argument to ext_low_ttl\n");
 1247                 return 1;
 1248             }
 1249             break;
 1250         case EXT_RECACHE_RATE:
 1251             if (subopts_value == NULL) {
 1252                 fprintf(stderr, "Missing ext_recache_rate argument\n");
 1253                 return 1;
 1254             }
 1255             if (!safe_strtoul(subopts_value, &settings.ext_recache_rate)) {
 1256                 fprintf(stderr, "could not parse argument to ext_recache_rate\n");
 1257                 return 1;
 1258             }
 1259             break;
 1260         case EXT_COMPACT_UNDER:
 1261             if (subopts_value == NULL) {
 1262                 fprintf(stderr, "Missing ext_compact_under argument\n");
 1263                 return 1;
 1264             }
 1265             if (!safe_strtoul(subopts_value, &settings.ext_compact_under)) {
 1266                 fprintf(stderr, "could not parse argument to ext_compact_under\n");
 1267                 return 1;
 1268             }
 1269             break;
 1270         case EXT_DROP_UNDER:
 1271             if (subopts_value == NULL) {
 1272                 fprintf(stderr, "Missing ext_drop_under argument\n");
 1273                 return 1;
 1274             }
 1275             if (!safe_strtoul(subopts_value, &settings.ext_drop_under)) {
 1276                 fprintf(stderr, "could not parse argument to ext_drop_under\n");
 1277                 return 1;
 1278             }
 1279             break;
 1280         case EXT_MAX_FRAG:
 1281             if (subopts_value == NULL) {
 1282                 fprintf(stderr, "Missing ext_max_frag argument\n");
 1283                 return 1;
 1284             }
 1285             if (!safe_strtod(subopts_value, &settings.ext_max_frag)) {
 1286                 fprintf(stderr, "could not parse argument to ext_max_frag\n");
 1287                 return 1;
 1288             }
 1289             break;
 1290         case SLAB_AUTOMOVE_FREERATIO:
 1291             if (subopts_value == NULL) {
 1292                 fprintf(stderr, "Missing slab_automove_freeratio argument\n");
 1293                 return 1;
 1294             }
 1295             if (!safe_strtod(subopts_value, &settings.slab_automove_freeratio)) {
 1296                 fprintf(stderr, "could not parse argument to slab_automove_freeratio\n");
 1297                 return 1;
 1298             }
 1299             break;
 1300         case EXT_DROP_UNREAD:
 1301             settings.ext_drop_unread = true;
 1302             break;
 1303         case EXT_PATH:
 1304             if (subopts_value) {
 1305                 struct extstore_conf_file *tmp = storage_conf_parse(subopts_value, ext_cf->page_size);
 1306                 if (tmp == NULL) {
 1307                     fprintf(stderr, "failed to parse ext_path argument\n");
 1308                     return 1;
 1309                 }
 1310                 if (cf->storage_file != NULL) {
 1311                     tmp->next = cf->storage_file;
 1312                 }
 1313                 cf->storage_file = tmp;
 1314             } else {
 1315                 fprintf(stderr, "missing argument to ext_path, ie: ext_path=/d/file:5G\n");
 1316                 return 1;
 1317             }
 1318             break;
 1319         default:
 1320             fprintf(stderr, "Illegal suboption \"%s\"\n", subopts_value);
 1321             return 1;
 1322     }
 1323 
 1324     return 0;
 1325 }
 1326 
 1327 int storage_check_config(void *conf) {
 1328     struct storage_settings *cf = conf;
 1329     struct extstore_conf *ext_cf = &cf->ext_cf;
 1330 
 1331     if (cf->storage_file) {
 1332         if (settings.item_size_max > ext_cf->wbuf_size) {
 1333             fprintf(stderr, "-I (item_size_max: %d) cannot be larger than ext_wbuf_size: %d\n",
 1334                 settings.item_size_max, ext_cf->wbuf_size);
 1335             return 1;
 1336         }
 1337 
 1338         if (settings.udpport) {
 1339             fprintf(stderr, "Cannot use UDP with extstore enabled (-U 0 to disable)\n");
 1340             return 1;
 1341         }
 1342 
 1343         return 0;
 1344     }
 1345 
 1346     return 2;
 1347 }
 1348 
 1349 void *storage_init(void *conf) {
 1350     struct storage_settings *cf = conf;
 1351     struct extstore_conf *ext_cf = &cf->ext_cf;
 1352 
 1353     enum extstore_res eres;
 1354     void *storage = NULL;
 1355     if (settings.ext_compact_under == 0) {
 1356         // If changing the default fraction (4), change the help text as well.
 1357         settings.ext_compact_under = cf->storage_file->page_count / 4;
 1358         /* Only rescues non-COLD items if below this threshold */
 1359         settings.ext_drop_under = cf->storage_file->page_count / 4;
 1360     }
 1361     crc32c_init();
 1362     /* Init free chunks to zero. */
 1363     for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
 1364         settings.ext_free_memchunks[x] = 0;
 1365     }
 1366     storage = extstore_init(cf->storage_file, ext_cf, &eres);
 1367     if (storage == NULL) {
 1368         fprintf(stderr, "Failed to initialize external storage: %s\n",
 1369                 extstore_err(eres));
 1370         if (eres == EXTSTORE_INIT_OPEN_FAIL) {
 1371             perror("extstore open");
 1372         }
 1373         return NULL;
 1374     }
 1375 
 1376     return storage;
 1377 }
 1378 
 1379 #endif