"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.15/extstore.c" (24 Feb 2022, 32288 Bytes) of package /linux/www/memcached-1.6.15.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "extstore.c" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 1.6.12_vs_1.6.13.

    1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    2 
    3 // FIXME: config.h?
    4 #include <stdint.h>
    5 #include <stdbool.h>
    6 // end FIXME
    7 #include <stdlib.h>
    8 #include <limits.h>
    9 #include <pthread.h>
   10 #include <sys/types.h>
   11 #include <sys/stat.h>
   12 #include <sys/uio.h>
   13 #include <fcntl.h>
   14 #include <unistd.h>
   15 #include <stdio.h>
   16 #include <string.h>
   17 #include <assert.h>
   18 #include "extstore.h"
   19 #include "config.h"
   20 
   21 // TODO: better if an init option turns this on/off.
   22 #ifdef EXTSTORE_DEBUG
   23 #define E_DEBUG(...) \
   24     do { \
   25         fprintf(stderr, __VA_ARGS__); \
   26     } while (0)
   27 #else
   28 #define E_DEBUG(...)
   29 #endif
   30 
   31 #define STAT_L(e) pthread_mutex_lock(&e->stats_mutex);
   32 #define STAT_UL(e) pthread_mutex_unlock(&e->stats_mutex);
   33 #define STAT_INCR(e, stat, amount) { \
   34     pthread_mutex_lock(&e->stats_mutex); \
   35     e->stats.stat += amount; \
   36     pthread_mutex_unlock(&e->stats_mutex); \
   37 }
   38 
   39 #define STAT_DECR(e, stat, amount) { \
   40     pthread_mutex_lock(&e->stats_mutex); \
   41     e->stats.stat -= amount; \
   42     pthread_mutex_unlock(&e->stats_mutex); \
   43 }
   44 
   45 typedef struct __store_wbuf {
   46     struct __store_wbuf *next;
   47     char *buf;
   48     char *buf_pos;
   49     unsigned int free;
   50     unsigned int size;
   51     unsigned int offset; /* offset into page this write starts at */
   52     bool full; /* done writing to this page */
   53     bool flushed; /* whether wbuf has been flushed to disk */
   54 } _store_wbuf;
   55 
   56 typedef struct _store_page {
   57     pthread_mutex_t mutex; /* Need to be held for most operations */
   58     uint64_t obj_count; /* _delete can decrease post-closing */
   59     uint64_t bytes_used; /* _delete can decrease post-closing */
   60     uint64_t offset; /* starting address of page within fd */
   61     unsigned int version;
   62     unsigned int refcount;
   63     unsigned int allocated;
   64     unsigned int written; /* item offsets can be past written if wbuf not flushed */
   65     unsigned int bucket; /* which bucket the page is linked into */
   66     unsigned int free_bucket; /* which bucket this page returns to when freed */
   67     int fd;
   68     unsigned short id;
   69     bool active; /* actively being written to */
   70     bool closed; /* closed and draining before free */
   71     bool free; /* on freelist */
   72     _store_wbuf *wbuf; /* currently active wbuf from the stack */
   73     struct _store_page *next;
   74 } store_page;
   75 
   76 typedef struct store_engine store_engine;
   77 typedef struct {
   78     pthread_mutex_t mutex;
   79     pthread_cond_t cond;
   80     obj_io *queue;
   81     obj_io *queue_tail;
   82     store_engine *e;
   83     unsigned int depth; // queue depth
   84 } store_io_thread;
   85 
   86 typedef struct {
   87     pthread_mutex_t mutex;
   88     pthread_cond_t cond;
   89     store_engine *e;
   90 } store_maint_thread;
   91 
   92 struct store_engine {
   93     pthread_mutex_t mutex; /* covers internal stacks and variables */
   94     store_page *pages; /* directly addressable page list */
   95     _store_wbuf *wbuf_stack; /* wbuf freelist */
   96     obj_io *io_stack; /* IO's to use with submitting wbuf's */
   97     store_io_thread *io_threads;
   98     store_maint_thread *maint_thread;
   99     store_page *page_freelist;
  100     store_page **page_buckets; /* stack of pages currently allocated to each bucket */
  101     store_page **free_page_buckets; /* stack of use-case isolated free pages */
  102     size_t page_size;
  103     unsigned int version; /* global version counter */
  104     unsigned int last_io_thread; /* round robin the IO threads */
  105     unsigned int io_threadcount; /* count of IO threads */
  106     unsigned int page_count;
  107     unsigned int page_free; /* unallocated pages */
  108     unsigned int page_bucketcount; /* count of potential page buckets */
  109     unsigned int free_page_bucketcount; /* count of free page buckets */
  110     unsigned int io_depth; /* FIXME: Might cache into thr struct */
  111     pthread_mutex_t stats_mutex;
  112     struct extstore_stats stats;
  113 };
  114 
  115 static _store_wbuf *wbuf_new(size_t size) {
  116     _store_wbuf *b = calloc(1, sizeof(_store_wbuf));
  117     if (b == NULL)
  118         return NULL;
  119     b->buf = calloc(size, sizeof(char));
  120     if (b->buf == NULL) {
  121         free(b);
  122         return NULL;
  123     }
  124     b->buf_pos = b->buf;
  125     b->free = size;
  126     b->size = size;
  127     return b;
  128 }
  129 
  130 static store_io_thread *_get_io_thread(store_engine *e) {
  131     int tid = -1;
  132     long long int low = LLONG_MAX;
  133     pthread_mutex_lock(&e->mutex);
  134     // find smallest queue. ignoring lock since being wrong isn't fatal.
  135     // TODO: if average queue depth can be quickly tracked, can break as soon
  136     // as we see a thread that's less than average, and start from last_io_thread
  137     for (int x = 0; x < e->io_threadcount; x++) {
  138         if (e->io_threads[x].depth == 0) {
  139             tid = x;
  140             break;
  141         } else if (e->io_threads[x].depth < low) {
  142                 tid = x;
  143             low = e->io_threads[x].depth;
  144         }
  145     }
  146     pthread_mutex_unlock(&e->mutex);
  147 
  148     return &e->io_threads[tid];
  149 }
  150 
  151 static uint64_t _next_version(store_engine *e) {
  152     return e->version++;
  153 }
  154 
  155 static void *extstore_io_thread(void *arg);
  156 static void *extstore_maint_thread(void *arg);
  157 
  158 /* Copies stats internal to engine and computes any derived values */
  159 void extstore_get_stats(void *ptr, struct extstore_stats *st) {
  160     store_engine *e = (store_engine *)ptr;
  161     STAT_L(e);
  162     memcpy(st, &e->stats, sizeof(struct extstore_stats));
  163     STAT_UL(e);
  164 
  165     // grab pages_free/pages_used
  166     pthread_mutex_lock(&e->mutex);
  167     st->pages_free = e->page_free;
  168     st->pages_used = e->page_count - e->page_free;
  169     pthread_mutex_unlock(&e->mutex);
  170     st->io_queue = 0;
  171     for (int x = 0; x < e->io_threadcount; x++) {
  172         pthread_mutex_lock(&e->io_threads[x].mutex);
  173         st->io_queue += e->io_threads[x].depth;
  174         pthread_mutex_unlock(&e->io_threads[x].mutex);
  175     }
  176     // calculate bytes_fragmented.
  177     // note that open and yet-filled pages count against fragmentation.
  178     st->bytes_fragmented = st->pages_used * e->page_size -
  179         st->bytes_used;
  180 }
  181 
  182 void extstore_get_page_data(void *ptr, struct extstore_stats *st) {
  183     store_engine *e = (store_engine *)ptr;
  184     STAT_L(e);
  185     memcpy(st->page_data, e->stats.page_data,
  186             sizeof(struct extstore_page_data) * e->page_count);
  187     STAT_UL(e);
  188 }
  189 
  190 const char *extstore_err(enum extstore_res res) {
  191     const char *rv = "unknown error";
  192     switch (res) {
  193         case EXTSTORE_INIT_BAD_WBUF_SIZE:
  194             rv = "page_size must be divisible by wbuf_size";
  195             break;
  196         case EXTSTORE_INIT_NEED_MORE_WBUF:
  197             rv = "wbuf_count must be >= page_buckets";
  198             break;
  199         case EXTSTORE_INIT_NEED_MORE_BUCKETS:
  200             rv = "page_buckets must be > 0";
  201             break;
  202         case EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT:
  203             rv = "page_size and wbuf_size must be divisible by 1024*1024*2";
  204             break;
  205         case EXTSTORE_INIT_TOO_MANY_PAGES:
  206             rv = "page_count must total to < 65536. Increase page_size or lower path sizes";
  207             break;
  208         case EXTSTORE_INIT_OOM:
  209             rv = "failed calloc for engine";
  210             break;
  211         case EXTSTORE_INIT_OPEN_FAIL:
  212             rv = "failed to open file";
  213             break;
  214         case EXTSTORE_INIT_THREAD_FAIL:
  215             break;
  216     }
  217     return rv;
  218 }
  219 
  220 // TODO: #define's for DEFAULT_BUCKET, FREE_VERSION, etc
  221 void *extstore_init(struct extstore_conf_file *fh, struct extstore_conf *cf,
  222         enum extstore_res *res) {
  223     int i;
  224     struct extstore_conf_file *f = NULL;
  225     pthread_t thread;
  226 
  227     if (cf->page_size % cf->wbuf_size != 0) {
  228         *res = EXTSTORE_INIT_BAD_WBUF_SIZE;
  229         return NULL;
  230     }
  231     // Should ensure at least one write buffer per potential page
  232     if (cf->page_buckets > cf->wbuf_count) {
  233         *res = EXTSTORE_INIT_NEED_MORE_WBUF;
  234         return NULL;
  235     }
  236     if (cf->page_buckets < 1) {
  237         *res = EXTSTORE_INIT_NEED_MORE_BUCKETS;
  238         return NULL;
  239     }
  240 
  241     // TODO: More intelligence around alignment of flash erasure block sizes
  242     if (cf->page_size % (1024 * 1024 * 2) != 0 ||
  243         cf->wbuf_size % (1024 * 1024 * 2) != 0) {
  244         *res = EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT;
  245         return NULL;
  246     }
  247 
  248     store_engine *e = calloc(1, sizeof(store_engine));
  249     if (e == NULL) {
  250         *res = EXTSTORE_INIT_OOM;
  251         return NULL;
  252     }
  253 
  254     e->page_size = cf->page_size;
  255     uint64_t temp_page_count = 0;
  256     for (f = fh; f != NULL; f = f->next) {
  257         f->fd = open(f->file, O_RDWR | O_CREAT, 0644);
  258         if (f->fd < 0) {
  259             *res = EXTSTORE_INIT_OPEN_FAIL;
  260 #ifdef EXTSTORE_DEBUG
  261             perror("extstore open");
  262 #endif
  263             free(e);
  264             return NULL;
  265         }
  266         // use an fcntl lock to help avoid double starting.
  267         struct flock lock;
  268         lock.l_type = F_WRLCK;
  269         lock.l_start = 0;
  270         lock.l_whence = SEEK_SET;
  271         lock.l_len = 0;
  272         if (fcntl(f->fd, F_SETLK, &lock) < 0) {
  273             *res = EXTSTORE_INIT_OPEN_FAIL;
  274             free(e);
  275             return NULL;
  276         }
  277         if (ftruncate(f->fd, 0) < 0) {
  278             *res = EXTSTORE_INIT_OPEN_FAIL;
  279             free(e);
  280             return NULL;
  281         }
  282 
  283         temp_page_count += f->page_count;
  284         f->offset = 0;
  285     }
  286 
  287     if (temp_page_count >= UINT16_MAX) {
  288         *res = EXTSTORE_INIT_TOO_MANY_PAGES;
  289         free(e);
  290         return NULL;
  291     }
  292     e->page_count = temp_page_count;
  293 
  294     e->pages = calloc(e->page_count, sizeof(store_page));
  295     if (e->pages == NULL) {
  296         *res = EXTSTORE_INIT_OOM;
  297         // FIXME: loop-close. make error label
  298         free(e);
  299         return NULL;
  300     }
  301 
  302     // interleave the pages between devices
  303     f = NULL; // start at the first device.
  304     for (i = 0; i < e->page_count; i++) {
  305         // find next device with available pages
  306         while (1) {
  307             // restart the loop
  308             if (f == NULL || f->next == NULL) {
  309                 f = fh;
  310             } else {
  311                 f = f->next;
  312             }
  313             if (f->page_count) {
  314                 f->page_count--;
  315                 break;
  316             }
  317         }
  318         pthread_mutex_init(&e->pages[i].mutex, NULL);
  319         e->pages[i].id = i;
  320         e->pages[i].fd = f->fd;
  321         e->pages[i].free_bucket = f->free_bucket;
  322         e->pages[i].offset = f->offset;
  323         e->pages[i].free = true;
  324         f->offset += e->page_size;
  325     }
  326 
  327     // free page buckets allows the app to organize devices by use case
  328     e->free_page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
  329     e->page_bucketcount = cf->page_buckets;
  330 
  331     for (i = e->page_count-1; i > 0; i--) {
  332         e->page_free++;
  333         if (e->pages[i].free_bucket == 0) {
  334             e->pages[i].next = e->page_freelist;
  335             e->page_freelist = &e->pages[i];
  336         } else {
  337             int fb = e->pages[i].free_bucket;
  338             e->pages[i].next = e->free_page_buckets[fb];
  339             e->free_page_buckets[fb] = &e->pages[i];
  340         }
  341     }
  342 
  343     // 0 is magic "page is freed" version
  344     e->version = 1;
  345 
  346     // scratch data for stats. TODO: malloc failure handle
  347     e->stats.page_data =
  348         calloc(e->page_count, sizeof(struct extstore_page_data));
  349     e->stats.page_count = e->page_count;
  350     e->stats.page_size = e->page_size;
  351 
  352     // page buckets lazily have pages assigned into them
  353     e->page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
  354     e->page_bucketcount = cf->page_buckets;
  355 
  356     // allocate write buffers
  357     // also IO's to use for shipping to IO thread
  358     for (i = 0; i < cf->wbuf_count; i++) {
  359         _store_wbuf *w = wbuf_new(cf->wbuf_size);
  360         obj_io *io = calloc(1, sizeof(obj_io));
  361         /* TODO: on error, loop again and free stack. */
  362         w->next = e->wbuf_stack;
  363         e->wbuf_stack = w;
  364         io->next = e->io_stack;
  365         e->io_stack = io;
  366     }
  367 
  368     pthread_mutex_init(&e->mutex, NULL);
  369     pthread_mutex_init(&e->stats_mutex, NULL);
  370 
  371     e->io_depth = cf->io_depth;
  372 
  373     // spawn threads
  374     e->io_threads = calloc(cf->io_threadcount, sizeof(store_io_thread));
  375     for (i = 0; i < cf->io_threadcount; i++) {
  376         pthread_mutex_init(&e->io_threads[i].mutex, NULL);
  377         pthread_cond_init(&e->io_threads[i].cond, NULL);
  378         e->io_threads[i].e = e;
  379         // FIXME: error handling
  380         pthread_create(&thread, NULL, extstore_io_thread, &e->io_threads[i]);
  381     }
  382     e->io_threadcount = cf->io_threadcount;
  383 
  384     e->maint_thread = calloc(1, sizeof(store_maint_thread));
  385     e->maint_thread->e = e;
  386     // FIXME: error handling
  387     pthread_mutex_init(&e->maint_thread->mutex, NULL);
  388     pthread_cond_init(&e->maint_thread->cond, NULL);
  389     pthread_create(&thread, NULL, extstore_maint_thread, e->maint_thread);
  390 
  391     extstore_run_maint(e);
  392 
  393     return (void *)e;
  394 }
  395 
  396 void extstore_run_maint(void *ptr) {
  397     store_engine *e = (store_engine *)ptr;
  398     pthread_cond_signal(&e->maint_thread->cond);
  399 }
  400 
  401 // call with *e locked
  402 static store_page *_allocate_page(store_engine *e, unsigned int bucket,
  403         unsigned int free_bucket) {
  404     assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size);
  405     store_page *tmp = NULL;
  406     // if a specific free bucket was requested, check there first
  407     if (free_bucket != 0 && e->free_page_buckets[free_bucket] != NULL) {
  408         assert(e->page_free > 0);
  409         tmp = e->free_page_buckets[free_bucket];
  410         e->free_page_buckets[free_bucket] = tmp->next;
  411     }
  412     // failing that, try the global list.
  413     if (tmp == NULL && e->page_freelist != NULL) {
  414         tmp = e->page_freelist;
  415         e->page_freelist = tmp->next;
  416     }
  417     E_DEBUG("EXTSTORE: allocating new page\n");
  418     // page_freelist can be empty if the only free pages are specialized and
  419     // we didn't just request one.
  420     if (e->page_free > 0 && tmp != NULL) {
  421         tmp->next = e->page_buckets[bucket];
  422         e->page_buckets[bucket] = tmp;
  423         tmp->active = true;
  424         tmp->free = false;
  425         tmp->closed = false;
  426         tmp->version = _next_version(e);
  427         tmp->bucket = bucket;
  428         e->page_free--;
  429         STAT_INCR(e, page_allocs, 1);
  430     } else {
  431         extstore_run_maint(e);
  432     }
  433     if (tmp)
  434         E_DEBUG("EXTSTORE: got page %u\n", tmp->id);
  435     return tmp;
  436 }
  437 
  438 // call with *p locked. locks *e
  439 static void _allocate_wbuf(store_engine *e, store_page *p) {
  440     _store_wbuf *wbuf = NULL;
  441     assert(!p->wbuf);
  442     pthread_mutex_lock(&e->mutex);
  443     if (e->wbuf_stack) {
  444         wbuf = e->wbuf_stack;
  445         e->wbuf_stack = wbuf->next;
  446         wbuf->next = 0;
  447     }
  448     pthread_mutex_unlock(&e->mutex);
  449     if (wbuf) {
  450         wbuf->offset = p->allocated;
  451         p->allocated += wbuf->size;
  452         wbuf->free = wbuf->size;
  453         wbuf->buf_pos = wbuf->buf;
  454         wbuf->full = false;
  455         wbuf->flushed = false;
  456 
  457         p->wbuf = wbuf;
  458     }
  459 }
  460 
  461 /* callback after wbuf is flushed. can only remove wbuf's from the head onward
  462  * if successfully flushed, which complicates this routine. each callback
  463  * attempts to free the wbuf stack, which is finally done when the head wbuf's
  464  * callback happens.
  465  * It's rare flushes would happen out of order.
  466  */
  467 static void _wbuf_cb(void *ep, obj_io *io, int ret) {
  468     store_engine *e = (store_engine *)ep;
  469     store_page *p = &e->pages[io->page_id];
  470     _store_wbuf *w = (_store_wbuf *) io->data;
  471 
  472     // TODO: Examine return code. Not entirely sure how to handle errors.
  473     // Naive first-pass should probably cause the page to close/free.
  474     w->flushed = true;
  475     pthread_mutex_lock(&p->mutex);
  476     assert(p->wbuf != NULL && p->wbuf == w);
  477     assert(p->written == w->offset);
  478     p->written += w->size;
  479     p->wbuf = NULL;
  480 
  481     if (p->written == e->page_size)
  482         p->active = false;
  483 
  484     // return the wbuf
  485     pthread_mutex_lock(&e->mutex);
  486     w->next = e->wbuf_stack;
  487     e->wbuf_stack = w;
  488     // also return the IO we just used.
  489     io->next = e->io_stack;
  490     e->io_stack = io;
  491     pthread_mutex_unlock(&e->mutex);
  492     pthread_mutex_unlock(&p->mutex);
  493 }
  494 
  495 /* Wraps pages current wbuf in an io and submits to IO thread.
  496  * Called with p locked, locks e.
  497  */
  498 static void _submit_wbuf(store_engine *e, store_page *p) {
  499     _store_wbuf *w;
  500     pthread_mutex_lock(&e->mutex);
  501     obj_io *io = e->io_stack;
  502     e->io_stack = io->next;
  503     pthread_mutex_unlock(&e->mutex);
  504     w = p->wbuf;
  505 
  506     // zero out the end of the wbuf to allow blind readback of data.
  507     memset(w->buf + (w->size - w->free), 0, w->free);
  508 
  509     io->next = NULL;
  510     io->mode = OBJ_IO_WRITE;
  511     io->page_id = p->id;
  512     io->data = w;
  513     io->offset = w->offset;
  514     io->len = w->size;
  515     io->buf = w->buf;
  516     io->cb = _wbuf_cb;
  517 
  518     extstore_submit(e, io);
  519 }
  520 
  521 /* engine write function; takes engine, item_io.
  522  * fast fail if no available write buffer (flushing)
  523  * lock engine context, find active page, unlock
  524  * if page full, submit page/buffer to io thread.
  525  *
  526  * write is designed to be flaky; if page full, caller must try again to get
  527  * new page. best if used from a background thread that can harmlessly retry.
  528  */
  529 
  530 int extstore_write_request(void *ptr, unsigned int bucket,
  531         unsigned int free_bucket, obj_io *io) {
  532     store_engine *e = (store_engine *)ptr;
  533     store_page *p;
  534     int ret = -1;
  535     if (bucket >= e->page_bucketcount)
  536         return ret;
  537 
  538     pthread_mutex_lock(&e->mutex);
  539     p = e->page_buckets[bucket];
  540     if (!p) {
  541         p = _allocate_page(e, bucket, free_bucket);
  542     }
  543     pthread_mutex_unlock(&e->mutex);
  544     if (!p)
  545         return ret;
  546 
  547     pthread_mutex_lock(&p->mutex);
  548 
  549     // FIXME: can't null out page_buckets!!!
  550     // page is full, clear bucket and retry later.
  551     if (!p->active ||
  552             ((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) {
  553         pthread_mutex_unlock(&p->mutex);
  554         pthread_mutex_lock(&e->mutex);
  555         _allocate_page(e, bucket, free_bucket);
  556         pthread_mutex_unlock(&e->mutex);
  557         return ret;
  558     }
  559 
  560     // if io won't fit, submit IO for wbuf and find new one.
  561     if (p->wbuf && p->wbuf->free < io->len && !p->wbuf->full) {
  562         _submit_wbuf(e, p);
  563         p->wbuf->full = true;
  564     }
  565 
  566     if (!p->wbuf && p->allocated < e->page_size) {
  567         _allocate_wbuf(e, p);
  568     }
  569 
  570     // hand over buffer for caller to copy into
  571     // leaves p locked.
  572     if (p->wbuf && !p->wbuf->full && p->wbuf->free >= io->len) {
  573         io->buf = p->wbuf->buf_pos;
  574         io->page_id = p->id;
  575         return 0;
  576     }
  577 
  578     pthread_mutex_unlock(&p->mutex);
  579     // p->written is incremented post-wbuf flush
  580     return ret;
  581 }
  582 
  583 /* _must_ be called after a successful write_request.
  584  * fills the rest of io structure.
  585  */
  586 void extstore_write(void *ptr, obj_io *io) {
  587     store_engine *e = (store_engine *)ptr;
  588     store_page *p = &e->pages[io->page_id];
  589 
  590     io->offset = p->wbuf->offset + (p->wbuf->size - p->wbuf->free);
  591     io->page_version = p->version;
  592     p->wbuf->buf_pos += io->len;
  593     p->wbuf->free -= io->len;
  594     p->bytes_used += io->len;
  595     p->obj_count++;
  596     STAT_L(e);
  597     e->stats.bytes_written += io->len;
  598     e->stats.bytes_used += io->len;
  599     e->stats.objects_written++;
  600     e->stats.objects_used++;
  601     STAT_UL(e);
  602 
  603     pthread_mutex_unlock(&p->mutex);
  604 }
  605 
  606 /* engine submit function; takes engine, item_io stack.
  607  * lock io_thread context and add stack?
  608  * signal io thread to wake.
  609  * return success.
  610  */
  611 int extstore_submit(void *ptr, obj_io *io) {
  612     store_engine *e = (store_engine *)ptr;
  613 
  614     unsigned int depth = 0;
  615     obj_io *tio = io;
  616     obj_io *tail = NULL;
  617     while (tio != NULL) {
  618         tail = tio; // keep updating potential tail.
  619         depth++;
  620         tio = tio->next;
  621     }
  622 
  623     store_io_thread *t = _get_io_thread(e);
  624     pthread_mutex_lock(&t->mutex);
  625 
  626     t->depth += depth;
  627     if (t->queue == NULL) {
  628         t->queue = io;
  629         t->queue_tail = tail;
  630     } else {
  631         // Have to put the *io stack at the end of current queue.
  632         assert(tail->next == NULL);
  633         assert(t->queue_tail->next == NULL);
  634         t->queue_tail->next = io;
  635         t->queue_tail = tail;
  636     }
  637 
  638     pthread_mutex_unlock(&t->mutex);
  639 
  640     //pthread_mutex_lock(&t->mutex);
  641     pthread_cond_signal(&t->cond);
  642     //pthread_mutex_unlock(&t->mutex);
  643     return 0;
  644 }
  645 
  646 /* engine note delete function: takes engine, page id, size?
  647  * note that an item in this page is no longer valid
  648  */
  649 int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version,
  650         unsigned int count, unsigned int bytes) {
  651     store_engine *e = (store_engine *)ptr;
  652     // FIXME: validate page_id in bounds
  653     store_page *p = &e->pages[page_id];
  654     int ret = 0;
  655 
  656     pthread_mutex_lock(&p->mutex);
  657     if (!p->closed && p->version == page_version) {
  658         if (p->bytes_used >= bytes) {
  659             p->bytes_used -= bytes;
  660         } else {
  661             p->bytes_used = 0;
  662         }
  663 
  664         if (p->obj_count >= count) {
  665             p->obj_count -= count;
  666         } else {
  667             p->obj_count = 0; // caller has bad accounting?
  668         }
  669         STAT_L(e);
  670         e->stats.bytes_used -= bytes;
  671         e->stats.objects_used -= count;
  672         STAT_UL(e);
  673 
  674         if (p->obj_count == 0) {
  675             extstore_run_maint(e);
  676         }
  677     } else {
  678         ret = -1;
  679     }
  680     pthread_mutex_unlock(&p->mutex);
  681     return ret;
  682 }
  683 
  684 int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version) {
  685     store_engine *e = (store_engine *)ptr;
  686     store_page *p = &e->pages[page_id];
  687     int ret = 0;
  688 
  689     pthread_mutex_lock(&p->mutex);
  690     if (p->version != page_version)
  691         ret = -1;
  692     pthread_mutex_unlock(&p->mutex);
  693     return ret;
  694 }
  695 
  696 /* allows a compactor to say "we're done with this page, kill it. */
  697 void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version) {
  698     store_engine *e = (store_engine *)ptr;
  699     store_page *p = &e->pages[page_id];
  700 
  701     pthread_mutex_lock(&p->mutex);
  702     if (!p->closed && p->version == page_version) {
  703         p->closed = true;
  704         extstore_run_maint(e);
  705     }
  706     pthread_mutex_unlock(&p->mutex);
  707 }
  708 
  709 /* Finds an attached wbuf that can satisfy the read.
  710  * Since wbufs can potentially be flushed to disk out of order, they are only
  711  * removed as the head of the list successfully flushes to disk.
  712  */
  713 // call with *p locked
  714 // FIXME: protect from reading past wbuf
  715 static inline int _read_from_wbuf(store_page *p, obj_io *io) {
  716     _store_wbuf *wbuf = p->wbuf;
  717     assert(wbuf != NULL);
  718     assert(io->offset < p->written + wbuf->size);
  719     if (io->iov == NULL) {
  720         memcpy(io->buf, wbuf->buf + (io->offset - wbuf->offset), io->len);
  721     } else {
  722         int x;
  723         unsigned int off = io->offset - wbuf->offset;
  724         // need to loop fill iovecs
  725         for (x = 0; x < io->iovcnt; x++) {
  726             struct iovec *iov = &io->iov[x];
  727             memcpy(iov->iov_base, wbuf->buf + off, iov->iov_len);
  728             off += iov->iov_len;
  729         }
  730     }
  731     return io->len;
  732 }
  733 
  734 /* engine IO thread; takes engine context
  735  * manage writes/reads
  736  * runs IO callbacks inline after each IO
  737  */
  738 // FIXME: protect from reading past page
  739 static void *extstore_io_thread(void *arg) {
  740     store_io_thread *me = (store_io_thread *)arg;
  741     store_engine *e = me->e;
  742     while (1) {
  743         obj_io *io_stack = NULL;
  744         pthread_mutex_lock(&me->mutex);
  745         if (me->queue == NULL) {
  746             pthread_cond_wait(&me->cond, &me->mutex);
  747         }
  748 
  749         // Pull and disconnect a batch from the queue
  750         // Chew small batches from the queue so the IO thread picker can keep
  751         // the IO queue depth even, instead of piling on threads one at a time
  752         // as they gobble a queue.
  753         if (me->queue != NULL) {
  754             int i;
  755             obj_io *end = NULL;
  756             io_stack = me->queue;
  757             end = io_stack;
  758             for (i = 1; i < e->io_depth; i++) {
  759                 if (end->next) {
  760                     end = end->next;
  761                 } else {
  762                     me->queue_tail = end->next;
  763                     break;
  764                 }
  765             }
  766             me->depth -= i;
  767             me->queue = end->next;
  768             end->next = NULL;
  769         }
  770         pthread_mutex_unlock(&me->mutex);
  771 
  772         obj_io *cur_io = io_stack;
  773         while (cur_io) {
  774             // We need to note next before the callback in case the obj_io
  775             // gets reused.
  776             obj_io *next = cur_io->next;
  777             int ret = 0;
  778             int do_op = 1;
  779             store_page *p = &e->pages[cur_io->page_id];
  780             // TODO: loop if not enough bytes were read/written.
  781             switch (cur_io->mode) {
  782                 case OBJ_IO_READ:
  783                     // Page is currently open. deal if read is past the end.
  784                     pthread_mutex_lock(&p->mutex);
  785                     if (!p->free && !p->closed && p->version == cur_io->page_version) {
  786                         if (p->active && cur_io->offset >= p->written) {
  787                             ret = _read_from_wbuf(p, cur_io);
  788                             do_op = 0;
  789                         } else {
  790                             p->refcount++;
  791                         }
  792                         STAT_L(e);
  793                         e->stats.bytes_read += cur_io->len;
  794                         e->stats.objects_read++;
  795                         STAT_UL(e);
  796                     } else {
  797                         do_op = 0;
  798                         ret = -2; // TODO: enum in IO for status?
  799                     }
  800                     pthread_mutex_unlock(&p->mutex);
  801                     if (do_op) {
  802 #if !defined(HAVE_PREAD) || !defined(HAVE_PREADV)
  803                         // TODO: lseek offset is natively 64-bit on OS X, but
  804                         // perhaps not on all platforms? Else use lseek64()
  805                         ret = lseek(p->fd, p->offset + cur_io->offset, SEEK_SET);
  806                         if (ret >= 0) {
  807                             if (cur_io->iov == NULL) {
  808                                 ret = read(p->fd, cur_io->buf, cur_io->len);
  809                             } else {
  810                                 ret = readv(p->fd, cur_io->iov, cur_io->iovcnt);
  811                             }
  812                         }
  813 #else
  814                         if (cur_io->iov == NULL) {
  815                             ret = pread(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
  816                         } else {
  817                             ret = preadv(p->fd, cur_io->iov, cur_io->iovcnt, p->offset + cur_io->offset);
  818                         }
  819 #endif
  820                     }
  821                     break;
  822                 case OBJ_IO_WRITE:
  823                     do_op = 0;
  824                     // FIXME: Should hold refcount during write. doesn't
  825                     // currently matter since page can't free while active.
  826                     ret = pwrite(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
  827                     break;
  828             }
  829             if (ret == 0) {
  830                 E_DEBUG("read returned nothing\n");
  831             }
  832 
  833 #ifdef EXTSTORE_DEBUG
  834             if (ret == -1) {
  835                 perror("read/write op failed");
  836             }
  837 #endif
  838             cur_io->cb(e, cur_io, ret);
  839             if (do_op) {
  840                 pthread_mutex_lock(&p->mutex);
  841                 p->refcount--;
  842                 pthread_mutex_unlock(&p->mutex);
  843             }
  844             cur_io = next;
  845         }
  846     }
  847 
  848     return NULL;
  849 }
  850 
  851 // call with *p locked.
  852 static void _free_page(store_engine *e, store_page *p) {
  853     store_page *tmp = NULL;
  854     store_page *prev = NULL;
  855     E_DEBUG("EXTSTORE: freeing page %u\n", p->id);
  856     STAT_L(e);
  857     e->stats.objects_used -= p->obj_count;
  858     e->stats.bytes_used -= p->bytes_used;
  859     e->stats.page_reclaims++;
  860     STAT_UL(e);
  861     pthread_mutex_lock(&e->mutex);
  862     // unlink page from bucket list
  863     tmp = e->page_buckets[p->bucket];
  864     while (tmp) {
  865         if (tmp == p) {
  866             if (prev) {
  867                 prev->next = tmp->next;
  868             } else {
  869                 e->page_buckets[p->bucket] = tmp->next;
  870             }
  871             tmp->next = NULL;
  872             break;
  873         }
  874         prev = tmp;
  875         tmp = tmp->next;
  876     }
  877     // reset most values
  878     p->version = 0;
  879     p->obj_count = 0;
  880     p->bytes_used = 0;
  881     p->allocated = 0;
  882     p->written = 0;
  883     p->bucket = 0;
  884     p->active = false;
  885     p->closed = false;
  886     p->free = true;
  887     // add to page stack
  888     // TODO: free_page_buckets first class and remove redundancy?
  889     if (p->free_bucket != 0) {
  890         p->next = e->free_page_buckets[p->free_bucket];
  891         e->free_page_buckets[p->free_bucket] = p;
  892     } else {
  893         p->next = e->page_freelist;
  894         e->page_freelist = p;
  895     }
  896     e->page_free++;
  897     pthread_mutex_unlock(&e->mutex);
  898 }
  899 
  900 /* engine maint thread; takes engine context.
  901  * Uses version to ensure oldest possible objects are being evicted.
  902  * Needs interface to inform owner of pages with fewer objects or most space
  903  * free, which can then be actively compacted to avoid eviction.
  904  *
  905  * This gets called asynchronously after every page allocation. Could run less
  906  * often if more pages are free.
  907  *
  908  * Another allocation call is required if an attempted free didn't happen
  909  * due to the page having a refcount.
  910  */
  911 
  912 // TODO: Don't over-evict pages if waiting on refcounts to drop
  913 static void *extstore_maint_thread(void *arg) {
  914     store_maint_thread *me = (store_maint_thread *)arg;
  915     store_engine *e = me->e;
  916     struct extstore_page_data *pd =
  917         calloc(e->page_count, sizeof(struct extstore_page_data));
  918     pthread_mutex_lock(&me->mutex);
  919     while (1) {
  920         int i;
  921         bool do_evict = false;
  922         unsigned int low_page = 0;
  923         uint64_t low_version = ULLONG_MAX;
  924 
  925         pthread_cond_wait(&me->cond, &me->mutex);
  926         pthread_mutex_lock(&e->mutex);
  927         // default freelist requires at least one page free.
  928         // specialized freelists fall back to default once full.
  929         if (e->page_free == 0 || e->page_freelist == NULL) {
  930             do_evict = true;
  931         }
  932         pthread_mutex_unlock(&e->mutex);
  933         memset(pd, 0, sizeof(struct extstore_page_data) * e->page_count);
  934 
  935         for (i = 0; i < e->page_count; i++) {
  936             store_page *p = &e->pages[i];
  937             pthread_mutex_lock(&p->mutex);
  938             pd[p->id].free_bucket = p->free_bucket;
  939             if (p->active || p->free) {
  940                 pthread_mutex_unlock(&p->mutex);
  941                 continue;
  942             }
  943             if (p->obj_count > 0 && !p->closed) {
  944                 pd[p->id].version = p->version;
  945                 pd[p->id].bytes_used = p->bytes_used;
  946                 pd[p->id].bucket = p->bucket;
  947                 // low_version/low_page are only used in the eviction
  948                 // scenario. when we evict, it's only to fill the default page
  949                 // bucket again.
  950                 // TODO: experiment with allowing evicting up to a single page
  951                 // for any specific free bucket. this is *probably* required
  952                 // since it could cause a load bias on default-only devices?
  953                 if (p->free_bucket == 0 && p->version < low_version) {
  954                     low_version = p->version;
  955                     low_page = i;
  956                 }
  957             }
  958             if ((p->obj_count == 0 || p->closed) && p->refcount == 0) {
  959                 _free_page(e, p);
  960                 // Found a page to free, no longer need to evict.
  961                 do_evict = false;
  962             }
  963             pthread_mutex_unlock(&p->mutex);
  964         }
  965 
  966         if (do_evict && low_version != ULLONG_MAX) {
  967             store_page *p = &e->pages[low_page];
  968             E_DEBUG("EXTSTORE: evicting page [%d] [v: %llu]\n",
  969                     p->id, (unsigned long long) p->version);
  970             pthread_mutex_lock(&p->mutex);
  971             if (!p->closed) {
  972                 p->closed = true;
  973                 STAT_L(e);
  974                 e->stats.page_evictions++;
  975                 e->stats.objects_evicted += p->obj_count;
  976                 e->stats.bytes_evicted += p->bytes_used;
  977                 STAT_UL(e);
  978                 if (p->refcount == 0) {
  979                     _free_page(e, p);
  980                 }
  981             }
  982             pthread_mutex_unlock(&p->mutex);
  983         }
  984 
  985         // copy the page data into engine context so callers can use it from
  986         // the stats lock.
  987         STAT_L(e);
  988         memcpy(e->stats.page_data, pd,
  989                 sizeof(struct extstore_page_data) * e->page_count);
  990         STAT_UL(e);
  991     }
  992 
  993     return NULL;
  994 }