"Fossies" - the Fresh Open Source Software Archive 
Member "memcached-1.6.9/extstore.c" (21 Nov 2020, 31990 Bytes) of package /linux/www/memcached-1.6.9.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "extstore.c" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
1.6.8_vs_1.6.9.
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3 // FIXME: config.h?
4 #include <stdint.h>
5 #include <stdbool.h>
6 // end FIXME
7 #include <stdlib.h>
8 #include <limits.h>
9 #include <pthread.h>
10 #include <sys/types.h>
11 #include <sys/stat.h>
12 #include <sys/uio.h>
13 #include <fcntl.h>
14 #include <unistd.h>
15 #include <stdio.h>
16 #include <string.h>
17 #include <assert.h>
18 #include "extstore.h"
19 #include "config.h"
20
21 // TODO: better if an init option turns this on/off.
22 #ifdef EXTSTORE_DEBUG
23 #define E_DEBUG(...) \
24 do { \
25 fprintf(stderr, __VA_ARGS__); \
26 } while (0)
27 #else
28 #define E_DEBUG(...)
29 #endif
30
31 #define STAT_L(e) pthread_mutex_lock(&e->stats_mutex);
32 #define STAT_UL(e) pthread_mutex_unlock(&e->stats_mutex);
33 #define STAT_INCR(e, stat, amount) { \
34 pthread_mutex_lock(&e->stats_mutex); \
35 e->stats.stat += amount; \
36 pthread_mutex_unlock(&e->stats_mutex); \
37 }
38
39 #define STAT_DECR(e, stat, amount) { \
40 pthread_mutex_lock(&e->stats_mutex); \
41 e->stats.stat -= amount; \
42 pthread_mutex_unlock(&e->stats_mutex); \
43 }
44
45 typedef struct __store_wbuf {
46 struct __store_wbuf *next;
47 char *buf;
48 char *buf_pos;
49 unsigned int free;
50 unsigned int size;
51 unsigned int offset; /* offset into page this write starts at */
52 bool full; /* done writing to this page */
53 bool flushed; /* whether wbuf has been flushed to disk */
54 } _store_wbuf;
55
56 typedef struct _store_page {
57 pthread_mutex_t mutex; /* Need to be held for most operations */
58 uint64_t obj_count; /* _delete can decrease post-closing */
59 uint64_t bytes_used; /* _delete can decrease post-closing */
60 uint64_t offset; /* starting address of page within fd */
61 unsigned int version;
62 unsigned int refcount;
63 unsigned int allocated;
64 unsigned int written; /* item offsets can be past written if wbuf not flushed */
65 unsigned int bucket; /* which bucket the page is linked into */
66 unsigned int free_bucket; /* which bucket this page returns to when freed */
67 int fd;
68 unsigned short id;
69 bool active; /* actively being written to */
70 bool closed; /* closed and draining before free */
71 bool free; /* on freelist */
72 _store_wbuf *wbuf; /* currently active wbuf from the stack */
73 struct _store_page *next;
74 } store_page;
75
76 typedef struct store_engine store_engine;
77 typedef struct {
78 pthread_mutex_t mutex;
79 pthread_cond_t cond;
80 obj_io *queue;
81 store_engine *e;
82 unsigned int depth; // queue depth
83 } store_io_thread;
84
85 typedef struct {
86 pthread_mutex_t mutex;
87 pthread_cond_t cond;
88 store_engine *e;
89 } store_maint_thread;
90
91 struct store_engine {
92 pthread_mutex_t mutex; /* covers internal stacks and variables */
93 store_page *pages; /* directly addressable page list */
94 _store_wbuf *wbuf_stack; /* wbuf freelist */
95 obj_io *io_stack; /* IO's to use with submitting wbuf's */
96 store_io_thread *io_threads;
97 store_maint_thread *maint_thread;
98 store_page *page_freelist;
99 store_page **page_buckets; /* stack of pages currently allocated to each bucket */
100 store_page **free_page_buckets; /* stack of use-case isolated free pages */
101 size_t page_size;
102 unsigned int version; /* global version counter */
103 unsigned int last_io_thread; /* round robin the IO threads */
104 unsigned int io_threadcount; /* count of IO threads */
105 unsigned int page_count;
106 unsigned int page_free; /* unallocated pages */
107 unsigned int page_bucketcount; /* count of potential page buckets */
108 unsigned int free_page_bucketcount; /* count of free page buckets */
109 unsigned int io_depth; /* FIXME: Might cache into thr struct */
110 pthread_mutex_t stats_mutex;
111 struct extstore_stats stats;
112 };
113
114 static _store_wbuf *wbuf_new(size_t size) {
115 _store_wbuf *b = calloc(1, sizeof(_store_wbuf));
116 if (b == NULL)
117 return NULL;
118 b->buf = calloc(size, sizeof(char));
119 if (b->buf == NULL) {
120 free(b);
121 return NULL;
122 }
123 b->buf_pos = b->buf;
124 b->free = size;
125 b->size = size;
126 return b;
127 }
128
129 static store_io_thread *_get_io_thread(store_engine *e) {
130 int tid = -1;
131 long long int low = LLONG_MAX;
132 pthread_mutex_lock(&e->mutex);
133 // find smallest queue. ignoring lock since being wrong isn't fatal.
134 // TODO: if average queue depth can be quickly tracked, can break as soon
135 // as we see a thread that's less than average, and start from last_io_thread
136 for (int x = 0; x < e->io_threadcount; x++) {
137 if (e->io_threads[x].depth == 0) {
138 tid = x;
139 break;
140 } else if (e->io_threads[x].depth < low) {
141 tid = x;
142 low = e->io_threads[x].depth;
143 }
144 }
145 pthread_mutex_unlock(&e->mutex);
146
147 return &e->io_threads[tid];
148 }
149
150 static uint64_t _next_version(store_engine *e) {
151 return e->version++;
152 }
153
154 static void *extstore_io_thread(void *arg);
155 static void *extstore_maint_thread(void *arg);
156
157 /* Copies stats internal to engine and computes any derived values */
158 void extstore_get_stats(void *ptr, struct extstore_stats *st) {
159 store_engine *e = (store_engine *)ptr;
160 STAT_L(e);
161 memcpy(st, &e->stats, sizeof(struct extstore_stats));
162 STAT_UL(e);
163
164 // grab pages_free/pages_used
165 pthread_mutex_lock(&e->mutex);
166 st->pages_free = e->page_free;
167 st->pages_used = e->page_count - e->page_free;
168 pthread_mutex_unlock(&e->mutex);
169 st->io_queue = 0;
170 for (int x = 0; x < e->io_threadcount; x++) {
171 pthread_mutex_lock(&e->io_threads[x].mutex);
172 st->io_queue += e->io_threads[x].depth;
173 pthread_mutex_unlock(&e->io_threads[x].mutex);
174 }
175 // calculate bytes_fragmented.
176 // note that open and yet-filled pages count against fragmentation.
177 st->bytes_fragmented = st->pages_used * e->page_size -
178 st->bytes_used;
179 }
180
181 void extstore_get_page_data(void *ptr, struct extstore_stats *st) {
182 store_engine *e = (store_engine *)ptr;
183 STAT_L(e);
184 memcpy(st->page_data, e->stats.page_data,
185 sizeof(struct extstore_page_data) * e->page_count);
186 STAT_UL(e);
187 }
188
189 const char *extstore_err(enum extstore_res res) {
190 const char *rv = "unknown error";
191 switch (res) {
192 case EXTSTORE_INIT_BAD_WBUF_SIZE:
193 rv = "page_size must be divisible by wbuf_size";
194 break;
195 case EXTSTORE_INIT_NEED_MORE_WBUF:
196 rv = "wbuf_count must be >= page_buckets";
197 break;
198 case EXTSTORE_INIT_NEED_MORE_BUCKETS:
199 rv = "page_buckets must be > 0";
200 break;
201 case EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT:
202 rv = "page_size and wbuf_size must be divisible by 1024*1024*2";
203 break;
204 case EXTSTORE_INIT_TOO_MANY_PAGES:
205 rv = "page_count must total to < 65536. Increase page_size or lower path sizes";
206 break;
207 case EXTSTORE_INIT_OOM:
208 rv = "failed calloc for engine";
209 break;
210 case EXTSTORE_INIT_OPEN_FAIL:
211 rv = "failed to open file";
212 break;
213 case EXTSTORE_INIT_THREAD_FAIL:
214 break;
215 }
216 return rv;
217 }
218
219 // TODO: #define's for DEFAULT_BUCKET, FREE_VERSION, etc
220 void *extstore_init(struct extstore_conf_file *fh, struct extstore_conf *cf,
221 enum extstore_res *res) {
222 int i;
223 struct extstore_conf_file *f = NULL;
224 pthread_t thread;
225
226 if (cf->page_size % cf->wbuf_size != 0) {
227 *res = EXTSTORE_INIT_BAD_WBUF_SIZE;
228 return NULL;
229 }
230 // Should ensure at least one write buffer per potential page
231 if (cf->page_buckets > cf->wbuf_count) {
232 *res = EXTSTORE_INIT_NEED_MORE_WBUF;
233 return NULL;
234 }
235 if (cf->page_buckets < 1) {
236 *res = EXTSTORE_INIT_NEED_MORE_BUCKETS;
237 return NULL;
238 }
239
240 // TODO: More intelligence around alignment of flash erasure block sizes
241 if (cf->page_size % (1024 * 1024 * 2) != 0 ||
242 cf->wbuf_size % (1024 * 1024 * 2) != 0) {
243 *res = EXTSTORE_INIT_PAGE_WBUF_ALIGNMENT;
244 return NULL;
245 }
246
247 store_engine *e = calloc(1, sizeof(store_engine));
248 if (e == NULL) {
249 *res = EXTSTORE_INIT_OOM;
250 return NULL;
251 }
252
253 e->page_size = cf->page_size;
254 uint64_t temp_page_count = 0;
255 for (f = fh; f != NULL; f = f->next) {
256 f->fd = open(f->file, O_RDWR | O_CREAT, 0644);
257 if (f->fd < 0) {
258 *res = EXTSTORE_INIT_OPEN_FAIL;
259 #ifdef EXTSTORE_DEBUG
260 perror("extstore open");
261 #endif
262 free(e);
263 return NULL;
264 }
265 // use an fcntl lock to help avoid double starting.
266 struct flock lock;
267 lock.l_type = F_WRLCK;
268 lock.l_start = 0;
269 lock.l_whence = SEEK_SET;
270 lock.l_len = 0;
271 if (fcntl(f->fd, F_SETLK, &lock) < 0) {
272 *res = EXTSTORE_INIT_OPEN_FAIL;
273 free(e);
274 return NULL;
275 }
276 if (ftruncate(f->fd, 0) < 0) {
277 *res = EXTSTORE_INIT_OPEN_FAIL;
278 free(e);
279 return NULL;
280 }
281
282 temp_page_count += f->page_count;
283 f->offset = 0;
284 }
285
286 if (temp_page_count >= UINT16_MAX) {
287 *res = EXTSTORE_INIT_TOO_MANY_PAGES;
288 free(e);
289 return NULL;
290 }
291 e->page_count = temp_page_count;
292
293 e->pages = calloc(e->page_count, sizeof(store_page));
294 if (e->pages == NULL) {
295 *res = EXTSTORE_INIT_OOM;
296 // FIXME: loop-close. make error label
297 free(e);
298 return NULL;
299 }
300
301 // interleave the pages between devices
302 f = NULL; // start at the first device.
303 for (i = 0; i < e->page_count; i++) {
304 // find next device with available pages
305 while (1) {
306 // restart the loop
307 if (f == NULL || f->next == NULL) {
308 f = fh;
309 } else {
310 f = f->next;
311 }
312 if (f->page_count) {
313 f->page_count--;
314 break;
315 }
316 }
317 pthread_mutex_init(&e->pages[i].mutex, NULL);
318 e->pages[i].id = i;
319 e->pages[i].fd = f->fd;
320 e->pages[i].free_bucket = f->free_bucket;
321 e->pages[i].offset = f->offset;
322 e->pages[i].free = true;
323 f->offset += e->page_size;
324 }
325
326 // free page buckets allows the app to organize devices by use case
327 e->free_page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
328 e->page_bucketcount = cf->page_buckets;
329
330 for (i = e->page_count-1; i > 0; i--) {
331 e->page_free++;
332 if (e->pages[i].free_bucket == 0) {
333 e->pages[i].next = e->page_freelist;
334 e->page_freelist = &e->pages[i];
335 } else {
336 int fb = e->pages[i].free_bucket;
337 e->pages[i].next = e->free_page_buckets[fb];
338 e->free_page_buckets[fb] = &e->pages[i];
339 }
340 }
341
342 // 0 is magic "page is freed" version
343 e->version = 1;
344
345 // scratch data for stats. TODO: malloc failure handle
346 e->stats.page_data =
347 calloc(e->page_count, sizeof(struct extstore_page_data));
348 e->stats.page_count = e->page_count;
349 e->stats.page_size = e->page_size;
350
351 // page buckets lazily have pages assigned into them
352 e->page_buckets = calloc(cf->page_buckets, sizeof(store_page *));
353 e->page_bucketcount = cf->page_buckets;
354
355 // allocate write buffers
356 // also IO's to use for shipping to IO thread
357 for (i = 0; i < cf->wbuf_count; i++) {
358 _store_wbuf *w = wbuf_new(cf->wbuf_size);
359 obj_io *io = calloc(1, sizeof(obj_io));
360 /* TODO: on error, loop again and free stack. */
361 w->next = e->wbuf_stack;
362 e->wbuf_stack = w;
363 io->next = e->io_stack;
364 e->io_stack = io;
365 }
366
367 pthread_mutex_init(&e->mutex, NULL);
368 pthread_mutex_init(&e->stats_mutex, NULL);
369
370 e->io_depth = cf->io_depth;
371
372 // spawn threads
373 e->io_threads = calloc(cf->io_threadcount, sizeof(store_io_thread));
374 for (i = 0; i < cf->io_threadcount; i++) {
375 pthread_mutex_init(&e->io_threads[i].mutex, NULL);
376 pthread_cond_init(&e->io_threads[i].cond, NULL);
377 e->io_threads[i].e = e;
378 // FIXME: error handling
379 pthread_create(&thread, NULL, extstore_io_thread, &e->io_threads[i]);
380 }
381 e->io_threadcount = cf->io_threadcount;
382
383 e->maint_thread = calloc(1, sizeof(store_maint_thread));
384 e->maint_thread->e = e;
385 // FIXME: error handling
386 pthread_mutex_init(&e->maint_thread->mutex, NULL);
387 pthread_cond_init(&e->maint_thread->cond, NULL);
388 pthread_create(&thread, NULL, extstore_maint_thread, e->maint_thread);
389
390 extstore_run_maint(e);
391
392 return (void *)e;
393 }
394
395 void extstore_run_maint(void *ptr) {
396 store_engine *e = (store_engine *)ptr;
397 pthread_cond_signal(&e->maint_thread->cond);
398 }
399
400 // call with *e locked
401 static store_page *_allocate_page(store_engine *e, unsigned int bucket,
402 unsigned int free_bucket) {
403 assert(!e->page_buckets[bucket] || e->page_buckets[bucket]->allocated == e->page_size);
404 store_page *tmp = NULL;
405 // if a specific free bucket was requested, check there first
406 if (free_bucket != 0 && e->free_page_buckets[free_bucket] != NULL) {
407 assert(e->page_free > 0);
408 tmp = e->free_page_buckets[free_bucket];
409 e->free_page_buckets[free_bucket] = tmp->next;
410 }
411 // failing that, try the global list.
412 if (tmp == NULL && e->page_freelist != NULL) {
413 tmp = e->page_freelist;
414 e->page_freelist = tmp->next;
415 }
416 E_DEBUG("EXTSTORE: allocating new page\n");
417 // page_freelist can be empty if the only free pages are specialized and
418 // we didn't just request one.
419 if (e->page_free > 0 && tmp != NULL) {
420 tmp->next = e->page_buckets[bucket];
421 e->page_buckets[bucket] = tmp;
422 tmp->active = true;
423 tmp->free = false;
424 tmp->closed = false;
425 tmp->version = _next_version(e);
426 tmp->bucket = bucket;
427 e->page_free--;
428 STAT_INCR(e, page_allocs, 1);
429 } else {
430 extstore_run_maint(e);
431 }
432 if (tmp)
433 E_DEBUG("EXTSTORE: got page %u\n", tmp->id);
434 return tmp;
435 }
436
437 // call with *p locked. locks *e
438 static void _allocate_wbuf(store_engine *e, store_page *p) {
439 _store_wbuf *wbuf = NULL;
440 assert(!p->wbuf);
441 pthread_mutex_lock(&e->mutex);
442 if (e->wbuf_stack) {
443 wbuf = e->wbuf_stack;
444 e->wbuf_stack = wbuf->next;
445 wbuf->next = 0;
446 }
447 pthread_mutex_unlock(&e->mutex);
448 if (wbuf) {
449 wbuf->offset = p->allocated;
450 p->allocated += wbuf->size;
451 wbuf->free = wbuf->size;
452 wbuf->buf_pos = wbuf->buf;
453 wbuf->full = false;
454 wbuf->flushed = false;
455
456 p->wbuf = wbuf;
457 }
458 }
459
460 /* callback after wbuf is flushed. can only remove wbuf's from the head onward
461 * if successfully flushed, which complicates this routine. each callback
462 * attempts to free the wbuf stack, which is finally done when the head wbuf's
463 * callback happens.
464 * It's rare flushes would happen out of order.
465 */
466 static void _wbuf_cb(void *ep, obj_io *io, int ret) {
467 store_engine *e = (store_engine *)ep;
468 store_page *p = &e->pages[io->page_id];
469 _store_wbuf *w = (_store_wbuf *) io->data;
470
471 // TODO: Examine return code. Not entirely sure how to handle errors.
472 // Naive first-pass should probably cause the page to close/free.
473 w->flushed = true;
474 pthread_mutex_lock(&p->mutex);
475 assert(p->wbuf != NULL && p->wbuf == w);
476 assert(p->written == w->offset);
477 p->written += w->size;
478 p->wbuf = NULL;
479
480 if (p->written == e->page_size)
481 p->active = false;
482
483 // return the wbuf
484 pthread_mutex_lock(&e->mutex);
485 w->next = e->wbuf_stack;
486 e->wbuf_stack = w;
487 // also return the IO we just used.
488 io->next = e->io_stack;
489 e->io_stack = io;
490 pthread_mutex_unlock(&e->mutex);
491 pthread_mutex_unlock(&p->mutex);
492 }
493
494 /* Wraps pages current wbuf in an io and submits to IO thread.
495 * Called with p locked, locks e.
496 */
497 static void _submit_wbuf(store_engine *e, store_page *p) {
498 _store_wbuf *w;
499 pthread_mutex_lock(&e->mutex);
500 obj_io *io = e->io_stack;
501 e->io_stack = io->next;
502 pthread_mutex_unlock(&e->mutex);
503 w = p->wbuf;
504
505 // zero out the end of the wbuf to allow blind readback of data.
506 memset(w->buf + (w->size - w->free), 0, w->free);
507
508 io->next = NULL;
509 io->mode = OBJ_IO_WRITE;
510 io->page_id = p->id;
511 io->data = w;
512 io->offset = w->offset;
513 io->len = w->size;
514 io->buf = w->buf;
515 io->cb = _wbuf_cb;
516
517 extstore_submit(e, io);
518 }
519
520 /* engine write function; takes engine, item_io.
521 * fast fail if no available write buffer (flushing)
522 * lock engine context, find active page, unlock
523 * if page full, submit page/buffer to io thread.
524 *
525 * write is designed to be flaky; if page full, caller must try again to get
526 * new page. best if used from a background thread that can harmlessly retry.
527 */
528
529 int extstore_write_request(void *ptr, unsigned int bucket,
530 unsigned int free_bucket, obj_io *io) {
531 store_engine *e = (store_engine *)ptr;
532 store_page *p;
533 int ret = -1;
534 if (bucket >= e->page_bucketcount)
535 return ret;
536
537 pthread_mutex_lock(&e->mutex);
538 p = e->page_buckets[bucket];
539 if (!p) {
540 p = _allocate_page(e, bucket, free_bucket);
541 }
542 pthread_mutex_unlock(&e->mutex);
543 if (!p)
544 return ret;
545
546 pthread_mutex_lock(&p->mutex);
547
548 // FIXME: can't null out page_buckets!!!
549 // page is full, clear bucket and retry later.
550 if (!p->active ||
551 ((!p->wbuf || p->wbuf->full) && p->allocated >= e->page_size)) {
552 pthread_mutex_unlock(&p->mutex);
553 pthread_mutex_lock(&e->mutex);
554 _allocate_page(e, bucket, free_bucket);
555 pthread_mutex_unlock(&e->mutex);
556 return ret;
557 }
558
559 // if io won't fit, submit IO for wbuf and find new one.
560 if (p->wbuf && p->wbuf->free < io->len && !p->wbuf->full) {
561 _submit_wbuf(e, p);
562 p->wbuf->full = true;
563 }
564
565 if (!p->wbuf && p->allocated < e->page_size) {
566 _allocate_wbuf(e, p);
567 }
568
569 // hand over buffer for caller to copy into
570 // leaves p locked.
571 if (p->wbuf && !p->wbuf->full && p->wbuf->free >= io->len) {
572 io->buf = p->wbuf->buf_pos;
573 io->page_id = p->id;
574 return 0;
575 }
576
577 pthread_mutex_unlock(&p->mutex);
578 // p->written is incremented post-wbuf flush
579 return ret;
580 }
581
582 /* _must_ be called after a successful write_request.
583 * fills the rest of io structure.
584 */
585 void extstore_write(void *ptr, obj_io *io) {
586 store_engine *e = (store_engine *)ptr;
587 store_page *p = &e->pages[io->page_id];
588
589 io->offset = p->wbuf->offset + (p->wbuf->size - p->wbuf->free);
590 io->page_version = p->version;
591 p->wbuf->buf_pos += io->len;
592 p->wbuf->free -= io->len;
593 p->bytes_used += io->len;
594 p->obj_count++;
595 STAT_L(e);
596 e->stats.bytes_written += io->len;
597 e->stats.bytes_used += io->len;
598 e->stats.objects_written++;
599 e->stats.objects_used++;
600 STAT_UL(e);
601
602 pthread_mutex_unlock(&p->mutex);
603 }
604
605 /* engine submit function; takes engine, item_io stack.
606 * lock io_thread context and add stack?
607 * signal io thread to wake.
608 * return success.
609 */
610 int extstore_submit(void *ptr, obj_io *io) {
611 store_engine *e = (store_engine *)ptr;
612 store_io_thread *t = _get_io_thread(e);
613
614 pthread_mutex_lock(&t->mutex);
615 if (t->queue == NULL) {
616 t->queue = io;
617 } else {
618 /* Have to put the *io stack at the end of current queue.
619 * FIXME: Optimize by tracking tail.
620 */
621 obj_io *tmp = t->queue;
622 while (tmp->next != NULL) {
623 tmp = tmp->next;
624 assert(tmp != t->queue);
625 }
626 tmp->next = io;
627 }
628 // TODO: extstore_submit(ptr, io, count)
629 obj_io *tio = io;
630 while (tio != NULL) {
631 t->depth++;
632 tio = tio->next;
633 }
634 pthread_mutex_unlock(&t->mutex);
635
636 //pthread_mutex_lock(&t->mutex);
637 pthread_cond_signal(&t->cond);
638 //pthread_mutex_unlock(&t->mutex);
639 return 0;
640 }
641
642 /* engine note delete function: takes engine, page id, size?
643 * note that an item in this page is no longer valid
644 */
645 int extstore_delete(void *ptr, unsigned int page_id, uint64_t page_version,
646 unsigned int count, unsigned int bytes) {
647 store_engine *e = (store_engine *)ptr;
648 // FIXME: validate page_id in bounds
649 store_page *p = &e->pages[page_id];
650 int ret = 0;
651
652 pthread_mutex_lock(&p->mutex);
653 if (!p->closed && p->version == page_version) {
654 if (p->bytes_used >= bytes) {
655 p->bytes_used -= bytes;
656 } else {
657 p->bytes_used = 0;
658 }
659
660 if (p->obj_count >= count) {
661 p->obj_count -= count;
662 } else {
663 p->obj_count = 0; // caller has bad accounting?
664 }
665 STAT_L(e);
666 e->stats.bytes_used -= bytes;
667 e->stats.objects_used -= count;
668 STAT_UL(e);
669
670 if (p->obj_count == 0) {
671 extstore_run_maint(e);
672 }
673 } else {
674 ret = -1;
675 }
676 pthread_mutex_unlock(&p->mutex);
677 return ret;
678 }
679
680 int extstore_check(void *ptr, unsigned int page_id, uint64_t page_version) {
681 store_engine *e = (store_engine *)ptr;
682 store_page *p = &e->pages[page_id];
683 int ret = 0;
684
685 pthread_mutex_lock(&p->mutex);
686 if (p->version != page_version)
687 ret = -1;
688 pthread_mutex_unlock(&p->mutex);
689 return ret;
690 }
691
692 /* allows a compactor to say "we're done with this page, kill it. */
693 void extstore_close_page(void *ptr, unsigned int page_id, uint64_t page_version) {
694 store_engine *e = (store_engine *)ptr;
695 store_page *p = &e->pages[page_id];
696
697 pthread_mutex_lock(&p->mutex);
698 if (!p->closed && p->version == page_version) {
699 p->closed = true;
700 extstore_run_maint(e);
701 }
702 pthread_mutex_unlock(&p->mutex);
703 }
704
705 /* Finds an attached wbuf that can satisfy the read.
706 * Since wbufs can potentially be flushed to disk out of order, they are only
707 * removed as the head of the list successfully flushes to disk.
708 */
709 // call with *p locked
710 // FIXME: protect from reading past wbuf
711 static inline int _read_from_wbuf(store_page *p, obj_io *io) {
712 _store_wbuf *wbuf = p->wbuf;
713 assert(wbuf != NULL);
714 assert(io->offset < p->written + wbuf->size);
715 if (io->iov == NULL) {
716 memcpy(io->buf, wbuf->buf + (io->offset - wbuf->offset), io->len);
717 } else {
718 int x;
719 unsigned int off = io->offset - wbuf->offset;
720 // need to loop fill iovecs
721 for (x = 0; x < io->iovcnt; x++) {
722 struct iovec *iov = &io->iov[x];
723 memcpy(iov->iov_base, wbuf->buf + off, iov->iov_len);
724 off += iov->iov_len;
725 }
726 }
727 return io->len;
728 }
729
730 /* engine IO thread; takes engine context
731 * manage writes/reads
732 * runs IO callbacks inline after each IO
733 */
734 // FIXME: protect from reading past page
735 static void *extstore_io_thread(void *arg) {
736 store_io_thread *me = (store_io_thread *)arg;
737 store_engine *e = me->e;
738 while (1) {
739 obj_io *io_stack = NULL;
740 pthread_mutex_lock(&me->mutex);
741 if (me->queue == NULL) {
742 pthread_cond_wait(&me->cond, &me->mutex);
743 }
744
745 // Pull and disconnect a batch from the queue
746 if (me->queue != NULL) {
747 int i;
748 obj_io *end = NULL;
749 io_stack = me->queue;
750 end = io_stack;
751 for (i = 1; i < e->io_depth; i++) {
752 if (end->next) {
753 end = end->next;
754 } else {
755 break;
756 }
757 }
758 me->depth -= i;
759 me->queue = end->next;
760 end->next = NULL;
761 }
762 pthread_mutex_unlock(&me->mutex);
763
764 obj_io *cur_io = io_stack;
765 while (cur_io) {
766 // We need to note next before the callback in case the obj_io
767 // gets reused.
768 obj_io *next = cur_io->next;
769 int ret = 0;
770 int do_op = 1;
771 store_page *p = &e->pages[cur_io->page_id];
772 // TODO: loop if not enough bytes were read/written.
773 switch (cur_io->mode) {
774 case OBJ_IO_READ:
775 // Page is currently open. deal if read is past the end.
776 pthread_mutex_lock(&p->mutex);
777 if (!p->free && !p->closed && p->version == cur_io->page_version) {
778 if (p->active && cur_io->offset >= p->written) {
779 ret = _read_from_wbuf(p, cur_io);
780 do_op = 0;
781 } else {
782 p->refcount++;
783 }
784 STAT_L(e);
785 e->stats.bytes_read += cur_io->len;
786 e->stats.objects_read++;
787 STAT_UL(e);
788 } else {
789 do_op = 0;
790 ret = -2; // TODO: enum in IO for status?
791 }
792 pthread_mutex_unlock(&p->mutex);
793 if (do_op) {
794 #if !defined(HAVE_PREAD) || !defined(HAVE_PREADV)
795 // TODO: lseek offset is natively 64-bit on OS X, but
796 // perhaps not on all platforms? Else use lseek64()
797 ret = lseek(p->fd, p->offset + cur_io->offset, SEEK_SET);
798 if (ret >= 0) {
799 if (cur_io->iov == NULL) {
800 ret = read(p->fd, cur_io->buf, cur_io->len);
801 } else {
802 ret = readv(p->fd, cur_io->iov, cur_io->iovcnt);
803 }
804 }
805 #else
806 if (cur_io->iov == NULL) {
807 ret = pread(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
808 } else {
809 ret = preadv(p->fd, cur_io->iov, cur_io->iovcnt, p->offset + cur_io->offset);
810 }
811 #endif
812 }
813 break;
814 case OBJ_IO_WRITE:
815 do_op = 0;
816 // FIXME: Should hold refcount during write. doesn't
817 // currently matter since page can't free while active.
818 ret = pwrite(p->fd, cur_io->buf, cur_io->len, p->offset + cur_io->offset);
819 break;
820 }
821 if (ret == 0) {
822 E_DEBUG("read returned nothing\n");
823 }
824
825 #ifdef EXTSTORE_DEBUG
826 if (ret == -1) {
827 perror("read/write op failed");
828 }
829 #endif
830 cur_io->cb(e, cur_io, ret);
831 if (do_op) {
832 pthread_mutex_lock(&p->mutex);
833 p->refcount--;
834 pthread_mutex_unlock(&p->mutex);
835 }
836 cur_io = next;
837 }
838 }
839
840 return NULL;
841 }
842
843 // call with *p locked.
844 static void _free_page(store_engine *e, store_page *p) {
845 store_page *tmp = NULL;
846 store_page *prev = NULL;
847 E_DEBUG("EXTSTORE: freeing page %u\n", p->id);
848 STAT_L(e);
849 e->stats.objects_used -= p->obj_count;
850 e->stats.bytes_used -= p->bytes_used;
851 e->stats.page_reclaims++;
852 STAT_UL(e);
853 pthread_mutex_lock(&e->mutex);
854 // unlink page from bucket list
855 tmp = e->page_buckets[p->bucket];
856 while (tmp) {
857 if (tmp == p) {
858 if (prev) {
859 prev->next = tmp->next;
860 } else {
861 e->page_buckets[p->bucket] = tmp->next;
862 }
863 tmp->next = NULL;
864 break;
865 }
866 prev = tmp;
867 tmp = tmp->next;
868 }
869 // reset most values
870 p->version = 0;
871 p->obj_count = 0;
872 p->bytes_used = 0;
873 p->allocated = 0;
874 p->written = 0;
875 p->bucket = 0;
876 p->active = false;
877 p->closed = false;
878 p->free = true;
879 // add to page stack
880 // TODO: free_page_buckets first class and remove redundancy?
881 if (p->free_bucket != 0) {
882 p->next = e->free_page_buckets[p->free_bucket];
883 e->free_page_buckets[p->free_bucket] = p;
884 } else {
885 p->next = e->page_freelist;
886 e->page_freelist = p;
887 }
888 e->page_free++;
889 pthread_mutex_unlock(&e->mutex);
890 }
891
892 /* engine maint thread; takes engine context.
893 * Uses version to ensure oldest possible objects are being evicted.
894 * Needs interface to inform owner of pages with fewer objects or most space
895 * free, which can then be actively compacted to avoid eviction.
896 *
897 * This gets called asynchronously after every page allocation. Could run less
898 * often if more pages are free.
899 *
900 * Another allocation call is required if an attempted free didn't happen
901 * due to the page having a refcount.
902 */
903
904 // TODO: Don't over-evict pages if waiting on refcounts to drop
905 static void *extstore_maint_thread(void *arg) {
906 store_maint_thread *me = (store_maint_thread *)arg;
907 store_engine *e = me->e;
908 struct extstore_page_data *pd =
909 calloc(e->page_count, sizeof(struct extstore_page_data));
910 pthread_mutex_lock(&me->mutex);
911 while (1) {
912 int i;
913 bool do_evict = false;
914 unsigned int low_page = 0;
915 uint64_t low_version = ULLONG_MAX;
916
917 pthread_cond_wait(&me->cond, &me->mutex);
918 pthread_mutex_lock(&e->mutex);
919 // default freelist requires at least one page free.
920 // specialized freelists fall back to default once full.
921 if (e->page_free == 0 || e->page_freelist == NULL) {
922 do_evict = true;
923 }
924 pthread_mutex_unlock(&e->mutex);
925 memset(pd, 0, sizeof(struct extstore_page_data) * e->page_count);
926
927 for (i = 0; i < e->page_count; i++) {
928 store_page *p = &e->pages[i];
929 pthread_mutex_lock(&p->mutex);
930 pd[p->id].free_bucket = p->free_bucket;
931 if (p->active || p->free) {
932 pthread_mutex_unlock(&p->mutex);
933 continue;
934 }
935 if (p->obj_count > 0 && !p->closed) {
936 pd[p->id].version = p->version;
937 pd[p->id].bytes_used = p->bytes_used;
938 pd[p->id].bucket = p->bucket;
939 // low_version/low_page are only used in the eviction
940 // scenario. when we evict, it's only to fill the default page
941 // bucket again.
942 // TODO: experiment with allowing evicting up to a single page
943 // for any specific free bucket. this is *probably* required
944 // since it could cause a load bias on default-only devices?
945 if (p->free_bucket == 0 && p->version < low_version) {
946 low_version = p->version;
947 low_page = i;
948 }
949 }
950 if ((p->obj_count == 0 || p->closed) && p->refcount == 0) {
951 _free_page(e, p);
952 // Found a page to free, no longer need to evict.
953 do_evict = false;
954 }
955 pthread_mutex_unlock(&p->mutex);
956 }
957
958 if (do_evict && low_version != ULLONG_MAX) {
959 store_page *p = &e->pages[low_page];
960 E_DEBUG("EXTSTORE: evicting page [%d] [v: %llu]\n",
961 p->id, (unsigned long long) p->version);
962 pthread_mutex_lock(&p->mutex);
963 if (!p->closed) {
964 p->closed = true;
965 STAT_L(e);
966 e->stats.page_evictions++;
967 e->stats.objects_evicted += p->obj_count;
968 e->stats.bytes_evicted += p->bytes_used;
969 STAT_UL(e);
970 if (p->refcount == 0) {
971 _free_page(e, p);
972 }
973 }
974 pthread_mutex_unlock(&p->mutex);
975 }
976
977 // copy the page data into engine context so callers can use it from
978 // the stats lock.
979 STAT_L(e);
980 memcpy(e->stats.page_data, pd,
981 sizeof(struct extstore_page_data) * e->page_count);
982 STAT_UL(e);
983 }
984
985 return NULL;
986 }