"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.9/extstore.c" (21 Nov 2020, 31990 Bytes) of package /linux/www/memcached-1.6.9.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "extstore.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.6.8_vs_1.6.9.

    1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    2 
    3 // FIXME: config.h?
    4 #include <stdint.h>
    5 #include <stdbool.h>
    6 // end FIXME
    7 #include <stdlib.h>
    8 #include <limits.h>
    9 #include <pthread.h>
   10 #include <sys/types.h>
   11 #include <sys/stat.h>
   12 #include <sys/uio.h>
   13 #include <fcntl.h>
   14 #include <unistd.h>
   15 #include <stdio.h>
   16 #include <string.h>
   17 #include <assert.h>
   18 #include "extstore.h"
   19 #include "config.h"
   20 
   21 // TODO: better if an init option turns this on/off.
   22 #ifdef EXTSTORE_DEBUG
   23 #define E_DEBUG(...) \
   24     do { \
   25         fprintf(stderr, __VA_ARGS__); \
   26     } while (0)
   27 #else
   28 #define E_DEBUG(...)
   29 #endif
   30 
   31 #define STAT_L(e) pthread_mutex_lock(&e->stats_mutex);
   32 #define STAT_UL(e) pthread_mutex_unlock(&e->stats_mutex);
   33 #define STAT_INCR(e, stat, amount) { \
   34     pthread_mutex_lock(&e->stats_mutex); \
   35     e->stats.stat += amount; \
   36     pthread_mutex_unlock(&e->stats_mutex); \
   37 }
   38 
   39 #define STAT_DECR(e, stat, amount) { \
   40     pthread_mutex_lock(&e->stats_mutex); \
   41     e->stats.stat -= amount; \
   42     pthread_mutex_unlock(&e->stats_mutex); \
   43 }
   44 
   45 typedef struct __store_wbuf {
   46     struct __store_wbuf *next;
   47     char *buf;
   48     char *buf_pos;
   49     unsigned int free;
   50     unsigned int size;
   51     unsigned int offset; /* offset into page this write starts at */
   52     bool full; /* done writing to this page */
   53     bool flushed; /* whether wbuf has been flushed to disk */
   54 } _store_wbuf;
   55 
   56 typedef struct _store_page {
   57     pthread_mutex_t mutex; /* Need to be held for most operations */
   58     uint64_t obj_count; /* _delete can decrease post-closing */
   59     uint64_t bytes_used; /* _delete can decrease post-closing */
   60     uint64_t offset; /* starting address of page within fd */
   61     unsigned int version;
   62     unsigned int refcount;
   63     unsigned int allocated;
   64     unsigned int written; /* item offsets can be past written if wbuf not flushed */
   65     unsigned int bucket; /* which bucket the page is linked into */
   66     unsigned int free_bucket; /* which bucket this page returns to when freed */
   67     int fd;
   68     unsigned short id;
   69     bool active; /* actively being written to */
   70     bool closed; /* closed and draining before free */
   71     bool free; /* on freelist */
   72     _store_wbuf *wbuf; /* currently active wbuf from the stack */
   73     struct _store_page *next;
   74 } store_page;
   75 
   76 typedef struct store_engine store_engine;
   77 typedef struct {
   78     pthread_mutex_t mutex;
   79     pthread_cond_t cond;
   80     obj_io *queue;
   81     store_engine *e;
   82     unsigned int depth; // queue depth
   83 } store_io_thread;
   84 
   85 typedef struct {
   86     pthread_mutex_t mutex;
   87     pthread_cond_t cond;
   88     store_engine *e;
   89 } store_maint_thread;
   90 
   91 struct store_engine {
   92     pthread_mutex_t mutex; /* covers internal stacks and variables */
   93     store_page *pages; /* directly addressable page list */
   94     _store_wbuf *wbuf_stack; /* wbuf freelist */
   95     obj_io *io_stack; /* IO's to use with submitting wbuf's */
   96     store_io_thread *io_threads;
   97     store_maint_thread *maint_thread;
   98     store_page *page_freelist;
   99     store_page **page_buckets; /* stack of pages currently allocated to each bucket */
  100     store_page **free_page_buckets; /* stack of use-case isolated free pages */
  101     size_t page_size;
  102     unsigned int version; /* global version counter */
  103     unsigned int last_io_thread; /* round robin the IO threads */
  104     unsigned int io_threadcount; /* count of IO threads */
  105     unsigned int page_count;
  106     unsigned int page_free; /* unallocated pages */
  107     unsigned int page_bucketcount; /* count of potential page buckets */
  108     unsigned int free_page_bucketcount; /* count of free page buckets */
  109     unsigned int io_depth; /* FIXME: Might cache into thr struct */
  110     pthread_mutex_t stats_mutex;
  111     struct extstore_stats stats;
  112 };
  113 
  114 static _store_wbuf *wbuf_new(size_t size) {
  115     _store_wbuf *b = calloc(1, sizeof(_store_wbuf));
  116     if (b == NULL)
  117         return NULL;
  118     b->buf = calloc(size, sizeof(char));
  119     if (b->buf == NULL) {
  120         free(b);
  121         return NULL;
  122     }
  123     b->buf_pos = b->buf;
  124     b->free = size;
  125     b->size = size;
  126     return b;
  127 }
  128 
  129 static store_io_thread *_get_io_thread(store_engine *e) {
  130     int tid = -1;
  131     long long int low = LLONG_MAX;
  132     pthread_mutex_lock(&e->mutex);
  133     // find smallest queue. ignoring lock since being wrong isn't fatal.
  134     // TODO: if average queue depth can be quickly tracked, can break as soon
  135     // as we see a thread that's less than average, and start from last_io_thread
  136     for (int x = 0; x < e->io_threadcount; x++) {
  137         if (e->io_threads[x].depth == 0) {
  138             tid = x;
  139             break;
  140         } else if (e->io_threads[x].depth < low) {
  141                 tid = x;
  142             low = e->io_threads[x].depth;
  143         }
  144     }
  145     pthread_mutex_unlock(&e->mutex);
  146 
  147     return &e->io_threads[tid];
  148 }
  149 
  150 static uint64_t _next_version(store_engine *e) {
  151     return e->version++;
  152 }
  153 
  154 static void *extstore_io_thread(void *arg);
  155 static void *extstore_maint_thread(void *arg);
  156 
  157 /* Copies stats internal to engine and computes any derived values */
  158 void extstore_get_stats(void *ptr, struct extstore_stats *st) {
  159     store_engine *e = (store_engine *)ptr;
  160     STAT_L(e);
  161     memcpy(st, &e->stats, sizeof(struct extstore_stats));
  162     STAT_UL(e);
  163 
  164     // grab pages_free/pages_used
  165     pthread_mutex_lock(&e->mutex);
  166     st->pages_free = e->page_free;
  167     st->pages_used = e->page_count - e->page_free;
  168     pthread_mutex_unlock(&e->mutex);
  169     st->io_queue = 0;
  170     for (int x = 0; x < e->io_threadcount; x++) {
  171         pthread_mutex_lock(&e->io_threads[x].mutex);
  172         st->io_queue += e->io_threads[x].depth;
  173         pthread_mutex_unlock(&e->io_threads[x].mutex);
  174     }
  175     // calculate bytes_fragmented.
  176     // note that open and yet-filled pages count against fragmentation.
  177     st->bytes_fragmented = st->pages_used * e->page_size -
  178         st->bytes_used;
  179 }
  180 
  181 void extstore_get_page_data(void *ptr, struct extstore_stats *st) {
  182     store_engine *e = (store_engine *)ptr;
  183     STAT_L(e);
  184     memcpy(st->page_data, e->stats.page_data,
  185             sizeof(struct extstore_page_data) * e->page_count);
  186     STAT_UL(e);
  187 }
  188 
  189 const char *extstore_err(enum extstore_res res) {
  190     const char *rv = "unknown error";
  191     switch (res) {
  192         case EXTSTORE_INIT_BAD_WBUF_SIZE:
  193             rv = "page_size must be divisible by wbuf_size";
  194             break;
  195         case EXTSTORE_INIT_NEED_MORE_WBUF:
  196             rv = "wbuf_count must be >= page_buckets";
  197             break;
  198         case EXTSTORE_INIT_NEED_MORE_BUCKETS:
  199             rv = "page_buckets must be > 0";
  200             break;
  201         case EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT:
  202             rv = "page_size and wbuf_size must be divisible by 1024*1024*2";
  203             break;
  204         case EXTSTORE_INIT_TOO_MANY_PAGES:
  205             rv = "page_count must total to < 65536. Increase page_size or lower path sizes";
  206             break;
  207         case EXTSTORE_INIT_OOM:
  208             rv = "failed calloc for engine";
  209             break;
  210         case EXTSTORE_INIT_OPEN_FAIL:
  211             rv = "failed to open file";
  212             break;
  213         case EXTSTORE_INIT_THREAD_FAIL:
  214             break;
  215     }
  216     return rv;
  217 }
  218 
  219 // TODO: #define's for DEFAULT_BUCKET, FREE_VERSION, etc
  220 void *extstore_init(struct extstore_conf_file *fh, struct extstore_conf *cf,
  221         enum extstore_res *res) {
  222     int i;
  223     struct extstore_conf_file *f = NULL;
  224     pthread_t thread;
  225 
  226     if (cf->page_size % cf->wbuf_size != 0) {
  227         *res = EXTSTORE_INIT_BAD_WBUF_SIZE;
  228         return NULL;
  229     }
  230     // Should ensure at least one write buffer per potential page
  231     if (cf->page_buckets > cf->wbuf_count) {
  232         *res = EXTSTORE_INIT_NEED_MORE_WBUF;
  233         return NULL;
  234     }
  235     if (cf->page_buckets < 1) {
  236         *res = EXTSTORE_INIT_NEED_MORE_BUCKETS;
  237         return NULL;
  238     }
  239 
  240     // TODO: More intelligence around alignment of flash erasure block sizes
  241     if (cf->page_size % (1024 * 1024 * 2) != 0 ||
  242         cf->wbuf_size % (1024 * 1024 * 2) != 0) {
  243         *res = EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT;
  244         return NULL;
  245     }
  246 
  247     store_engine *e = calloc(1, sizeof(store_engine));
  248     if (e == NULL) {
  249         *res = EXTSTORE_INIT_OOM;
  250         return NULL;
  251     }
  252 
  253     e->page_size = cf->page_size;
  254     uint64_t temp_page_count = 0;
  255     for (f = fh; f != NULL; f = f->next) {
  256         f->fd = open(f->file, O_RDWR | O_CREAT, 0644);
  257         if (f->fd < 0) {
  258             *res = EXTSTORE_INIT_OPEN_FAIL;
  259 #ifdef EXTSTORE_DEBUG
  260             perror("extstore open");
  261 #endif
  262             free(e);
  263             return NULL;
  264         }
  265         // use an fcntl lock to help avoid double starting.
  266         struct flock lock;
  267         lock.l_type = F_WRLCK;
  268         lock.l_start = 0;
  269         lock.l_whence = SEEK_SET;
  270         lock.l_len = 0;
  271         if (fcntl(f->fd, F_SETLK, &lock) < 0) {
  272             *res = EXTSTORE_INIT_OPEN_FAIL;
  273             free(e);
  274             return NULL;
  275         }
  276         if (ftruncate(f->fd, 0) < 0) {
  277             *res = EXTSTORE_INIT_OPEN_FAIL;
  278             free(e);
  279             return NULL;
  280         }
  281 
  282         temp_page_count += f->page_count;
  283         f->offset = 0;
  284     }
  285 
  286     if (temp_page_count >= UINT16_MAX) {
  287         *res = EXTSTORE_INIT_TOO_MANY_PAGES;
  288         free(e);
  289         return NULL;
  290     }
  291     e->page_count = temp_page_count;
  292 
  293     e->pages = calloc(e->page_count, sizeof(store_page));
  294     if (e->pages == NULL) {
  295         *res = EXTSTORE_INIT_OOM;
  296         // FIXME: loop-close. make error label
  297         free(e);
  298         return NULL;
  299     }
  300 
  301     // interleave the pages between devices
  302     f = NULL; // start at the first device.
  303     for (i = 0; i < e->page_count; i++) {
  304         // find next device with available pages
  305         while (1) {
  306             // restart the loop
  307             if (f == NULL || f->next == NULL) {
  308                 f = fh;
  309             } else {
  310                 f = f->next;
  311             }
  312             if (f->page_count) {
  313                 f->page_count--;
  314                 break;
  315             }
  316         }
  317         pthread_mutex_init(&e->pages[i].mutex, NULL);
  318         e->pages[i].id = i;
  319         e->pages[i].fd = f->fd;
  320         e->pages[i].free_bucket = f->free_bucket;
  321         e->pages[i].offset = f->offset;
  322         e->pages[i].free = true;
  323         f->offset += e->page_size;
  324     }
  325 
  326     // free page buckets allows the app to organize devices by use case
  327     e->free_page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
  328     e->page_bucketcount = cf->page_buckets;
  329 
  330     for (i = e->page_count-1; i > 0; i--) {
  331         e->page_free++;
  332         if (e->pages[i].free_bucket == 0) {
  333             e->pages[i].next = e->page_freelist;
  334             e->page_freelist = &e->pages[i];
  335         } else {
  336             int fb = e->pages[i].free_bucket;
  337             e->pages[i].next = e->free_page_buckets[fb];
  338             e->free_page_buckets[fb] = &e->pages[i];
  339         }
  340     }
  341 
  342     // 0 is magic "page is freed" version
  343     e->version = 1;
  344 
  345     // scratch data for stats. TODO: malloc failure handle
  346     e->stats.page_data =
  347         calloc(e->page_count, sizeof(struct extstore_page_data));
  348     e->stats.page_count = e->page_count;
  349     e->stats.page_size = e->page_size;
  350 
  351     // page buckets lazily have pages assigned into them
  352     e->page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
  353     e->page_bucketcount = cf->page_buckets;
  354 
  355     // allocate write buffers
  356     // also IO's to use for shipping to IO thread
  357     for (i = 0; i < cf->wbuf_count; i++) {
  358         _store_wbuf *w = wbuf_new(cf->wbuf_size);
  359         obj_io *io = calloc(1, sizeof(obj_io));
  360         /* TODO: on error, loop again and free stack. */
  361         w->next = e->wbuf_stack;
  362         e->wbuf_stack = w;
  363         io->next = e->io_stack;
  364         e->io_stack = io;
  365     }
  366 
  367     pthread_mutex_init(&e->mutex, NULL);
  368     pthread_mutex_init(&e->stats_mutex, NULL);
  369 
  370     e->io_depth = cf->io_depth;
  371 
  372     // spawn threads
  373     e->io_threads = calloc(cf->io_threadcount, sizeof(store_io_thread));
  374     for (i = 0; i < cf->io_threadcount; i++) {
  375         pthread_mutex_init(&e->io_threads[i].mutex, NULL);
  376         pthread_cond_init(&e->io_threads[i].cond, NULL);
  377         e->io_threads[i].e = e;
  378         // FIXME: error handling
  379         pthread_create(&thread, NULL, extstore_io_thread, &e->io_threads[i]);
  380     }
  381     e->io_threadcount = cf->io_threadcount;
  382 
  383     e->maint_thread = calloc(1, sizeof(store_maint_thread));
  384     e->maint_thread->e = e;
  385     // FIXME: error handling
  386     pthread_mutex_init(&e->maint_thread->mutex, NULL);
  387     pthread_cond_init(&e->maint_thread->cond, NULL);
  388     pthread_create(&thread, NULL, extstore_maint_thread, e->maint_thread);
  389 
  390     extstore_run_maint(e);
  391 
  392     return (void *)e;
  393 }
  394 
  395 void extstore_run_maint(void *ptr) {
  396     store_engine *e = (store_engine *)ptr;
  397     pthread_cond_signal(&e->maint_thread->cond);
  398 }
  399 
  400 // call with *e locked
  401 static store_page *_allocate_page(store_engine *e, unsigned int bucket,
  402         unsigned int free_bucket) {
  403     assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size);
  404     store_page *tmp = NULL;
  405     // if a specific free bucket was requested, check there first
  406     if (free_bucket != 0 && e->free_page_buckets[free_bucket] != NULL) {
  407         assert(e->page_free > 0);
  408         tmp = e->free_page_buckets[free_bucket];
  409         e->free_page_buckets[free_bucket] = tmp->next;
  410     }
  411     // failing that, try the global list.
  412     if (tmp == NULL && e->page_freelist != NULL) {
  413         tmp = e->page_freelist;
  414         e->page_freelist = tmp->next;
  415     }
  416     E_DEBUG("EXTSTORE: allocating new page\n");
  417     // page_freelist can be empty if the only free pages are specialized and
  418     // we didn't just request one.
  419     if (e->page_free > 0 && tmp != NULL) {
  420         tmp->next = e->page_buckets[bucket];
  421         e->page_buckets[bucket] = tmp;
  422         tmp->active = true;
  423         tmp->free = false;
  424         tmp->closed = false;
  425         tmp->version = _next_version(e);
  426         tmp->bucket = bucket;
  427         e->page_free--;
  428         STAT_INCR(e, page_allocs, 1);
  429     } else {
  430         extstore_run_maint(e);
  431     }
  432     if (tmp)
  433         E_DEBUG("EXTSTORE: got page %u\n", tmp->id);
  434     return tmp;
  435 }
  436 
  437 // call with *p locked. locks *e
  438 static void _allocate_wbuf(store_engine *e, store_page *p) {
  439     _store_wbuf *wbuf = NULL;
  440     assert(!p->wbuf);
  441     pthread_mutex_lock(&e->mutex);
  442     if (e->wbuf_stack) {
  443         wbuf = e->wbuf_stack;
  444         e->wbuf_stack = wbuf->next;
  445         wbuf->next = 0;
  446     }
  447     pthread_mutex_unlock(&e->mutex);
  448     if (wbuf) {
  449         wbuf->offset = p->allocated;
  450         p->allocated += wbuf->size;
  451         wbuf->free = wbuf->size;
  452         wbuf->buf_pos = wbuf->buf;
  453         wbuf->full = false;
  454         wbuf->flushed = false;
  455 
  456         p->wbuf = wbuf;
  457     }
  458 }
  459 
  460 /* callback after wbuf is flushed. can only remove wbuf's from the head onward
  461  * if successfully flushed, which complicates this routine. each callback
  462  * attempts to free the wbuf stack, which is finally done when the head wbuf's
  463  * callback happens.
  464  * It's rare flushes would happen out of order.
  465  */
  466 static void _wbuf_cb(void *ep, obj_io *io, int ret) {
  467     store_engine *e = (store_engine *)ep;
  468     store_page *p = &e->pages[io->page_id];
  469     _store_wbuf *w = (_store_wbuf *) io->data;
  470 
  471     // TODO: Examine return code. Not entirely sure how to handle errors.
  472     // Naive first-pass should probably cause the page to close/free.
  473     w->flushed = true;
  474     pthread_mutex_lock(&p->mutex);
  475     assert(p->wbuf != NULL && p->wbuf == w);
  476     assert(p->written == w->offset);
  477     p->written += w->size;
  478     p->wbuf = NULL;
  479 
  480     if (p->written == e->page_size)
  481         p->active = false;
  482 
  483     // return the wbuf
  484     pthread_mutex_lock(&e->mutex);
  485     w->next = e->wbuf_stack;
  486     e->wbuf_stack = w;
  487     // also return the IO we just used.
  488     io->next = e->io_stack;
  489     e->io_stack = io;
  490     pthread_mutex_unlock(&e->mutex);
  491     pthread_mutex_unlock(&p->mutex);
  492 }
  493 
  494 /* Wraps pages current wbuf in an io and submits to IO thread.
  495  * Called with p locked, locks e.
  496  */
  497 static void _submit_wbuf(store_engine *e, store_page *p) {
  498     _store_wbuf *w;
  499     pthread_mutex_lock(&e->mutex);
  500     obj_io *io = e->io_stack;
  501     e->io_stack = io->next;
  502     pthread_mutex_unlock(&e->mutex);
  503     w = p->wbuf;
  504 
  505     // zero out the end of the wbuf to allow blind readback of data.
  506     memset(w->buf + (w->size - w->free), 0, w->free);
  507 
  508     io->next = NULL;
  509     io->mode = OBJ_IO_WRITE;
  510     io->page_id = p->id;
  511     io->data = w;
  512     io->offset = w->offset;
  513     io->len = w->size;
  514     io->buf = w->buf;
  515     io->cb = _wbuf_cb;
  516 
  517     extstore_submit(e, io);
  518 }
  519 
  520 /* engine write function; takes engine, item_io.
  521  * fast fail if no available write buffer (flushing)
  522  * lock engine context, find active page, unlock
  523  * if page full, submit page/buffer to io thread.
  524  *
  525  * write is designed to be flaky; if page full, caller must try again to get
  526  * new page. best if used from a background thread that can harmlessly retry.
  527  */
  528 
  529 int extstore_write_request(void *ptr, unsigned int bucket,
  530         unsigned int free_bucket, obj_io *io) {
  531     store_engine *e = (store_engine *)ptr;
  532     store_page *p;
  533     int ret = -1;
  534     if (bucket >= e->page_bucketcount)
  535         return ret;
  536 
  537     pthread_mutex_lock(&e->mutex);
  538     p = e->page_buckets[bucket];
  539     if (!p) {
  540         p = _allocate_page(e, bucket, free_bucket);
  541     }
  542     pthread_mutex_unlock(&e->mutex);
  543     if (!p)
  544         return ret;
  545 
  546     pthread_mutex_lock(&p->mutex);
  547 
  548     // FIXME: can't null out page_buckets!!!
  549     // page is full, clear bucket and retry later.
  550     if (!p->active ||
  551             ((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) {
  552         pthread_mutex_unlock(&p->mutex);
  553         pthread_mutex_lock(&e->mutex);
  554         _allocate_page(e, bucket, free_bucket);
  555         pthread_mutex_unlock(&e->mutex);
  556         return ret;
  557     }
  558 
  559     // if io won't fit, submit IO for wbuf and find new one.
  560     if (p->wbuf && p->wbuf->free < io->len && !p->wbuf->full) {
  561         _submit_wbuf(e, p);
  562         p->wbuf->full = true;
  563     }
  564 
  565     if (!p->wbuf && p->allocated < e->page_size) {
  566         _allocate_wbuf(e, p);
  567     }
  568 
  569     // hand over buffer for caller to copy into
  570     // leaves p locked.
  571     if (p->wbuf && !p->wbuf->full && p->wbuf->free >= io->len) {
  572         io->buf = p->wbuf->buf_pos;
  573         io->page_id = p->id;
  574         return 0;
  575     }
  576 
  577     pthread_mutex_unlock(&p->mutex);
  578     // p->written is incremented post-wbuf flush
  579     return ret;
  580 }
  581 
  582 /* _must_ be called after a successful write_request.
  583  * fills the rest of io structure.
  584  */
  585 void extstore_write(void *ptr, obj_io *io) {
  586     store_engine *e = (store_engine *)ptr;
  587     store_page *p = &e->pages[io->page_id];
  588 
  589     io->offset = p->wbuf->offset + (p->wbuf->size - p->wbuf->free);
  590     io->page_version = p->version;
  591     p->wbuf->buf_pos += io->len;
  592     p->wbuf->free -= io->len;
  593     p->bytes_used += io->len;
  594     p->obj_count++;
  595     STAT_L(e);
  596     e->stats.bytes_written += io->len;
  597     e->stats.bytes_used += io->len;
  598     e->stats.objects_written++;
  599     e->stats.objects_used++;
  600     STAT_UL(e);
  601 
  602     pthread_mutex_unlock(&p->mutex);
  603 }
  604 
  605 /* engine submit function; takes engine, item_io stack.
  606  * lock io_thread context and add stack?
  607  * signal io thread to wake.
  608  * return success.
  609  */
  610 int extstore_submit(void *ptr, obj_io *io) {
  611     store_engine *e = (store_engine *)ptr;
  612     store_io_thread *t = _get_io_thread(e);
  613 
  614     pthread_mutex_lock(&t->mutex);
  615     if (t->queue == NULL) {
  616         t->queue = io;
  617     } else {
  618         /* Have to put the *io stack at the end of current queue.
  619          * FIXME: Optimize by tracking tail.
  620          */
  621         obj_io *tmp = t->queue;
  622         while (tmp->next != NULL) {
  623             tmp = tmp->next;
  624             assert(tmp != t->queue);
  625         }
  626         tmp->next = io;
  627     }
  628     // TODO: extstore_submit(ptr, io, count)
  629     obj_io *tio = io;
  630     while (tio != NULL) {
  631         t->depth++;
  632         tio = tio->next;
  633     }
  634     pthread_mutex_unlock(&t->mutex);
  635 
  636     //pthread_mutex_lock(&t->mutex);
  637     pthread_cond_signal(&t->cond);
  638     //pthread_mutex_unlock(&t->mutex);
  639     return 0;
  640 }
  641 
  642 /* engine note delete function: takes engine, page id, size?
  643  * note that an item in this page is no longer valid
  644  */
  645 int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version,
  646         unsigned int count, unsigned int bytes) {
  647     store_engine *e = (store_engine *)ptr;
  648     // FIXME: validate page_id in bounds
  649     store_page *p = &e->pages[page_id];
  650     int ret = 0;
  651 
  652     pthread_mutex_lock(&p->mutex);
  653     if (!p->closed && p->version == page_version) {
  654         if (p->bytes_used >= bytes) {
  655             p->bytes_used -= bytes;
  656         } else {
  657             p->bytes_used = 0;
  658         }
  659 
  660         if (p->obj_count >= count) {
  661             p->obj_count -= count;
  662         } else {
  663             p->obj_count = 0; // caller has bad accounting?
  664         }
  665         STAT_L(e);
  666         e->stats.bytes_used -= bytes;
  667         e->stats.objects_used -= count;
  668         STAT_UL(e);
  669 
  670         if (p->obj_count == 0) {
  671             extstore_run_maint(e);
  672         }
  673     } else {
  674         ret = -1;
  675     }
  676     pthread_mutex_unlock(&p->mutex);
  677     return ret;
  678 }
  679 
  680 int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version) {
  681     store_engine *e = (store_engine *)ptr;
  682     store_page *p = &e->pages[page_id];
  683     int ret = 0;
  684 
  685     pthread_mutex_lock(&p->mutex);
  686     if (p->version != page_version)
  687         ret = -1;
  688     pthread_mutex_unlock(&p->mutex);
  689     return ret;
  690 }
  691 
  692 /* allows a compactor to say "we're done with this page, kill it. */
  693 void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version) {
  694     store_engine *e = (store_engine *)ptr;
  695     store_page *p = &e->pages[page_id];
  696 
  697     pthread_mutex_lock(&p->mutex);
  698     if (!p->closed && p->version == page_version) {
  699         p->closed = true;
  700         extstore_run_maint(e);
  701     }
  702     pthread_mutex_unlock(&p->mutex);
  703 }
  704 
  705 /* Finds an attached wbuf that can satisfy the read.
  706  * Since wbufs can potentially be flushed to disk out of order, they are only
  707  * removed as the head of the list successfully flushes to disk.
  708  */
  709 // call with *p locked
  710 // FIXME: protect from reading past wbuf
  711 static inline int _read_from_wbuf(store_page *p, obj_io *io) {
  712     _store_wbuf *wbuf = p->wbuf;
  713     assert(wbuf != NULL);
  714     assert(io->offset < p->written + wbuf->size);
  715     if (io->iov == NULL) {
  716         memcpy(io->buf, wbuf->buf + (io->offset - wbuf->offset), io->len);
  717     } else {
  718         int x;
  719         unsigned int off = io->offset - wbuf->offset;
  720         // need to loop fill iovecs
  721         for (x = 0; x < io->iovcnt; x++) {
  722             struct iovec *iov = &io->iov[x];
  723             memcpy(iov->iov_base, wbuf->buf + off, iov->iov_len);
  724             off += iov->iov_len;
  725         }
  726     }
  727     return io->len;
  728 }
  729 
  730 /* engine IO thread; takes engine context
  731  * manage writes/reads
  732  * runs IO callbacks inline after each IO
  733  */
  734 // FIXME: protect from reading past page
  735 static void *extstore_io_thread(void *arg) {
  736     store_io_thread *me = (store_io_thread *)arg;
  737     store_engine *e = me->e;
  738     while (1) {
  739         obj_io *io_stack = NULL;
  740         pthread_mutex_lock(&me->mutex);
  741         if (me->queue == NULL) {
  742             pthread_cond_wait(&me->cond, &me->mutex);
  743         }
  744 
  745         // Pull and disconnect a batch from the queue
  746         if (me->queue != NULL) {
  747             int i;
  748             obj_io *end = NULL;
  749             io_stack = me->queue;
  750             end = io_stack;
  751             for (i = 1; i < e->io_depth; i++) {
  752                 if (end->next) {
  753                     end = end->next;
  754                 } else {
  755                     break;
  756                 }
  757             }
  758             me->depth -= i;
  759             me->queue = end->next;
  760             end->next = NULL;
  761         }
  762         pthread_mutex_unlock(&me->mutex);
  763 
  764         obj_io *cur_io = io_stack;
  765         while (cur_io) {
  766             // We need to note next before the callback in case the obj_io
  767             // gets reused.
  768             obj_io *next = cur_io->next;
  769             int ret = 0;
  770             int do_op = 1;
  771             store_page *p = &e->pages[cur_io->page_id];
  772             // TODO: loop if not enough bytes were read/written.
  773             switch (cur_io->mode) {
  774                 case OBJ_IO_READ:
  775                     // Page is currently open. deal if read is past the end.
  776                     pthread_mutex_lock(&p->mutex);
  777                     if (!p->free && !p->closed && p->version == cur_io->page_version) {
  778                         if (p->active && cur_io->offset >= p->written) {
  779                             ret = _read_from_wbuf(p, cur_io);
  780                             do_op = 0;
  781                         } else {
  782                             p->refcount++;
  783                         }
  784                         STAT_L(e);
  785                         e->stats.bytes_read += cur_io->len;
  786                         e->stats.objects_read++;
  787                         STAT_UL(e);
  788                     } else {
  789                         do_op = 0;
  790                         ret = -2; // TODO: enum in IO for status?
  791                     }
  792                     pthread_mutex_unlock(&p->mutex);
  793                     if (do_op) {
  794 #if !defined(HAVE_PREAD) || !defined(HAVE_PREADV)
  795                         // TODO: lseek offset is natively 64-bit on OS X, but
  796                         // perhaps not on all platforms? Else use lseek64()
  797                         ret = lseek(p->fd, p->offset + cur_io->offset, SEEK_SET);
  798                         if (ret >= 0) {
  799                             if (cur_io->iov == NULL) {
  800                                 ret = read(p->fd, cur_io->buf, cur_io->len);
  801                             } else {
  802                                 ret = readv(p->fd, cur_io->iov, cur_io->iovcnt);
  803                             }
  804                         }
  805 #else
  806                         if (cur_io->iov == NULL) {
  807                             ret = pread(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
  808                         } else {
  809                             ret = preadv(p->fd, cur_io->iov, cur_io->iovcnt, p->offset + cur_io->offset);
  810                         }
  811 #endif
  812                     }
  813                     break;
  814                 case OBJ_IO_WRITE:
  815                     do_op = 0;
  816                     // FIXME: Should hold refcount during write. doesn't
  817                     // currently matter since page can't free while active.
  818                     ret = pwrite(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
  819                     break;
  820             }
  821             if (ret == 0) {
  822                 E_DEBUG("read returned nothing\n");
  823             }
  824 
  825 #ifdef EXTSTORE_DEBUG
  826             if (ret == -1) {
  827                 perror("read/write op failed");
  828             }
  829 #endif
  830             cur_io->cb(e, cur_io, ret);
  831             if (do_op) {
  832                 pthread_mutex_lock(&p->mutex);
  833                 p->refcount--;
  834                 pthread_mutex_unlock(&p->mutex);
  835             }
  836             cur_io = next;
  837         }
  838     }
  839 
  840     return NULL;
  841 }
  842 
  843 // call with *p locked.
  844 static void _free_page(store_engine *e, store_page *p) {
  845     store_page *tmp = NULL;
  846     store_page *prev = NULL;
  847     E_DEBUG("EXTSTORE: freeing page %u\n", p->id);
  848     STAT_L(e);
  849     e->stats.objects_used -= p->obj_count;
  850     e->stats.bytes_used -= p->bytes_used;
  851     e->stats.page_reclaims++;
  852     STAT_UL(e);
  853     pthread_mutex_lock(&e->mutex);
  854     // unlink page from bucket list
  855     tmp = e->page_buckets[p->bucket];
  856     while (tmp) {
  857         if (tmp == p) {
  858             if (prev) {
  859                 prev->next = tmp->next;
  860             } else {
  861                 e->page_buckets[p->bucket] = tmp->next;
  862             }
  863             tmp->next = NULL;
  864             break;
  865         }
  866         prev = tmp;
  867         tmp = tmp->next;
  868     }
  869     // reset most values
  870     p->version = 0;
  871     p->obj_count = 0;
  872     p->bytes_used = 0;
  873     p->allocated = 0;
  874     p->written = 0;
  875     p->bucket = 0;
  876     p->active = false;
  877     p->closed = false;
  878     p->free = true;
  879     // add to page stack
  880     // TODO: free_page_buckets first class and remove redundancy?
  881     if (p->free_bucket != 0) {
  882         p->next = e->free_page_buckets[p->free_bucket];
  883         e->free_page_buckets[p->free_bucket] = p;
  884     } else {
  885         p->next = e->page_freelist;
  886         e->page_freelist = p;
  887     }
  888     e->page_free++;
  889     pthread_mutex_unlock(&e->mutex);
  890 }
  891 
  892 /* engine maint thread; takes engine context.
  893  * Uses version to ensure oldest possible objects are being evicted.
  894  * Needs interface to inform owner of pages with fewer objects or most space
  895  * free, which can then be actively compacted to avoid eviction.
  896  *
  897  * This gets called asynchronously after every page allocation. Could run less
  898  * often if more pages are free.
  899  *
  900  * Another allocation call is required if an attempted free didn't happen
  901  * due to the page having a refcount.
  902  */
  903 
  904 // TODO: Don't over-evict pages if waiting on refcounts to drop
  905 static void *extstore_maint_thread(void *arg) {
  906     store_maint_thread *me = (store_maint_thread *)arg;
  907     store_engine *e = me->e;
  908     struct extstore_page_data *pd =
  909         calloc(e->page_count, sizeof(struct extstore_page_data));
  910     pthread_mutex_lock(&me->mutex);
  911     while (1) {
  912         int i;
  913         bool do_evict = false;
  914         unsigned int low_page = 0;
  915         uint64_t low_version = ULLONG_MAX;
  916 
  917         pthread_cond_wait(&me->cond, &me->mutex);
  918         pthread_mutex_lock(&e->mutex);
  919         // default freelist requires at least one page free.
  920         // specialized freelists fall back to default once full.
  921         if (e->page_free == 0 || e->page_freelist == NULL) {
  922             do_evict = true;
  923         }
  924         pthread_mutex_unlock(&e->mutex);
  925         memset(pd, 0, sizeof(struct extstore_page_data) * e->page_count);
  926 
  927         for (i = 0; i < e->page_count; i++) {
  928             store_page *p = &e->pages[i];
  929             pthread_mutex_lock(&p->mutex);
  930             pd[p->id].free_bucket = p->free_bucket;
  931             if (p->active || p->free) {
  932                 pthread_mutex_unlock(&p->mutex);
  933                 continue;
  934             }
  935             if (p->obj_count > 0 && !p->closed) {
  936                 pd[p->id].version = p->version;
  937                 pd[p->id].bytes_used = p->bytes_used;
  938                 pd[p->id].bucket = p->bucket;
  939                 // low_version/low_page are only used in the eviction
  940                 // scenario. when we evict, it's only to fill the default page
  941                 // bucket again.
  942                 // TODO: experiment with allowing evicting up to a single page
  943                 // for any specific free bucket. this is *probably* required
  944                 // since it could cause a load bias on default-only devices?
  945                 if (p->free_bucket == 0 && p->version < low_version) {
  946                     low_version = p->version;
  947                     low_page = i;
  948                 }
  949             }
  950             if ((p->obj_count == 0 || p->closed) && p->refcount == 0) {
  951                 _free_page(e, p);
  952                 // Found a page to free, no longer need to evict.
  953                 do_evict = false;
  954             }
  955             pthread_mutex_unlock(&p->mutex);
  956         }
  957 
  958         if (do_evict && low_version != ULLONG_MAX) {
  959             store_page *p = &e->pages[low_page];
  960             E_DEBUG("EXTSTORE: evicting page [%d] [v: %llu]\n",
  961                     p->id, (unsigned long long) p->version);
  962             pthread_mutex_lock(&p->mutex);
  963             if (!p->closed) {
  964                 p->closed = true;
  965                 STAT_L(e);
  966                 e->stats.page_evictions++;
  967                 e->stats.objects_evicted += p->obj_count;
  968                 e->stats.bytes_evicted += p->bytes_used;
  969                 STAT_UL(e);
  970                 if (p->refcount == 0) {
  971                     _free_page(e, p);
  972                 }
  973             }
  974             pthread_mutex_unlock(&p->mutex);
  975         }
  976 
  977         // copy the page data into engine context so callers can use it from
  978         // the stats lock.
  979         STAT_L(e);
  980         memcpy(e->stats.page_data, pd,
  981                 sizeof(struct extstore_page_data) * e->page_count);
  982         STAT_UL(e);
  983     }
  984 
  985     return NULL;
  986 }