"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.9/logger.c" (21 Nov 2020, 28207 Bytes) of package /linux/www/memcached-1.6.9.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "logger.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.6.8_vs_1.6.9.

    1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    2 
    3 #include <stdlib.h>
    4 #include <stdio.h>
    5 #include <string.h>
    6 #include <errno.h>
    7 #include <poll.h>
    8 #include <ctype.h>
    9 #include <stdarg.h>
   10 
   11 #if defined(__sun)
   12 #include <atomic.h>
   13 #endif
   14 
   15 #include "memcached.h"
   16 #include "bipbuffer.h"
   17 
   18 #ifdef LOGGER_DEBUG
   19 #define L_DEBUG(...) \
   20     do { \
   21         fprintf(stderr, __VA_ARGS__); \
   22     } while (0)
   23 #else
   24 #define L_DEBUG(...)
   25 #endif
   26 
   27 
   28 /* TODO: put this in a struct and ditch the global vars. */
   29 static logger *logger_stack_head = NULL;
   30 static logger *logger_stack_tail = NULL;
   31 static unsigned int logger_count = 0;
   32 static volatile int do_run_logger_thread = 1;
   33 static pthread_t logger_tid;
   34 pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER;
   35 
   36 pthread_key_t logger_key;
   37 
   38 #if !defined(HAVE_GCC_64ATOMICS) && !defined(__sun)
   39 pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
   40 #endif
   41 
   42 #define WATCHER_LIMIT 20
   43 logger_watcher *watchers[20];
   44 struct pollfd watchers_pollfds[20];
   45 int watcher_count = 0;
   46 
   47 /* Should this go somewhere else? */
   48 static const entry_details default_entries[] = {
   49     [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, LOG_RAWCMDS, "<%d %s"},
   50     [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, LOG_EVICTIONS, NULL},
   51     [LOGGER_ITEM_GET] = {LOGGER_ITEM_GET_ENTRY, 512, LOG_FETCHERS, NULL},
   52     [LOGGER_ITEM_STORE] = {LOGGER_ITEM_STORE_ENTRY, 512, LOG_MUTATIONS, NULL},
   53     [LOGGER_CRAWLER_STATUS] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
   54         "type=lru_crawler crawler=%d lru=%s low_mark=%llu next_reclaims=%llu since_run=%u next_run=%d elapsed=%u examined=%llu reclaimed=%llu"
   55     },
   56     [LOGGER_SLAB_MOVE] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
   57         "type=slab_move src=%d dst=%d"
   58     },
   59 #ifdef EXTSTORE
   60     [LOGGER_EXTSTORE_WRITE] = {LOGGER_EXT_WRITE_ENTRY, 512, LOG_EVICTIONS, NULL},
   61     [LOGGER_COMPACT_START] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
   62         "type=compact_start id=%lu version=%llu"
   63     },
   64     [LOGGER_COMPACT_ABORT] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
   65         "type=compact_abort id=%lu"
   66     },
   67     [LOGGER_COMPACT_READ_START] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
   68         "type=compact_read_start id=%lu offset=%llu"
   69     },
   70     [LOGGER_COMPACT_READ_END] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
   71         "type=compact_read_end id=%lu offset=%llu rescues=%lu lost=%lu skipped=%lu"
   72     },
   73     [LOGGER_COMPACT_END] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
   74         "type=compact_end id=%lu"
   75     },
   76     [LOGGER_COMPACT_FRAGINFO] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
   77         "type=compact_fraginfo ratio=%.2f bytes=%lu"
   78     },
   79 #endif
   80 };
   81 
   82 #define WATCHER_ALL -1
   83 static int logger_thread_poll_watchers(int force_poll, int watcher);
   84 
   85 /*************************
   86  * Util functions shared between bg thread and workers
   87  *************************/
   88 
   89 /* Logger GID's can be used by watchers to put logs back into strict order
   90  */
   91 static uint64_t logger_gid = 0;
   92 uint64_t logger_get_gid(void) {
   93 #ifdef HAVE_GCC_64ATOMICS
   94     return __sync_add_and_fetch(&logger_gid, 1);
   95 #elif defined(__sun)
   96     return atomic_inc_64_nv(&logger_gid);
   97 #else
   98     mutex_lock(&logger_atomics_mutex);
   99     uint64_t res = ++logger_gid;
  100     mutex_unlock(&logger_atomics_mutex);
  101     return res;
  102 #endif
  103 }
  104 
  105 void logger_set_gid(uint64_t gid) {
  106 #ifdef HAVE_GCC_64ATOMICS
  107     __sync_add_and_fetch(&logger_gid, gid);
  108 #elif defined(__sun)
  109     atomic_add_64(&logger_gid);
  110 #else
  111     mutex_lock(&logger_atomics_mutex);
  112     logger_gid = gid;
  113     mutex_unlock(&logger_atomics_mutex);
  114 #endif
  115 }
  116 
  117 /* TODO: genericize lists. would be nice to import queue.h if the impact is
  118  * studied... otherwise can just write a local one.
  119  */
  120 /* Add to the list of threads with a logger object */
  121 static void logger_link_q(logger *l) {
  122     pthread_mutex_lock(&logger_stack_lock);
  123     assert(l != logger_stack_head);
  124 
  125     l->prev = 0;
  126     l->next = logger_stack_head;
  127     if (l->next) l->next->prev = l;
  128     logger_stack_head = l;
  129     if (logger_stack_tail == 0) logger_stack_tail = l;
  130     logger_count++;
  131     pthread_mutex_unlock(&logger_stack_lock);
  132     return;
  133 }
  134 
  135 /* Remove from the list of threads with a logger object */
  136 /*static void logger_unlink_q(logger *l) {
  137     pthread_mutex_lock(&logger_stack_lock);
  138     if (logger_stack_head == l) {
  139         assert(l->prev == 0);
  140         logger_stack_head = l->next;
  141     }
  142     if (logger_stack_tail == l) {
  143         assert(l->next == 0);
  144         logger_stack_tail = l->prev;
  145     }
  146     assert(l->next != l);
  147     assert(l->prev != l);
  148 
  149     if (l->next) l->next->prev = l->prev;
  150     if (l->prev) l->prev->next = l->next;
  151     logger_count--;
  152     pthread_mutex_unlock(&logger_stack_lock);
  153     return;
  154 }*/
  155 
  156 /* Called with logger stack locked.
  157  * Iterates over every watcher collecting enabled flags.
  158  */
  159 static void logger_set_flags(void) {
  160     logger *l = NULL;
  161     int x = 0;
  162     uint16_t f = 0; /* logger eflags */
  163 
  164     for (x = 0; x < WATCHER_LIMIT; x++) {
  165         logger_watcher *w = watchers[x];
  166         if (w == NULL)
  167             continue;
  168 
  169         f |= w->eflags;
  170     }
  171     for (l = logger_stack_head; l != NULL; l=l->next) {
  172         pthread_mutex_lock(&l->mutex);
  173         l->eflags = f;
  174         pthread_mutex_unlock(&l->mutex);
  175     }
  176     return;
  177 }
  178 
  179 /*************************
  180  * Logger background thread functions. Aggregates per-worker buffers and
  181  * writes to any watchers.
  182  *************************/
  183 
  184 #define LOGGER_PARSE_SCRATCH 4096
  185 
  186 static int _logger_thread_parse_ise(logentry *e, char *scratch) {
  187     int total;
  188     const char *cmd = "na";
  189     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
  190     struct logentry_item_store *le = (struct logentry_item_store *) e->data;
  191     const char * const status_map[] = {
  192         "not_stored", "stored", "exists", "not_found", "too_large", "no_memory" };
  193     const char * const cmd_map[] = {
  194         "null", "add", "set", "replace", "append", "prepend", "cas" };
  195 
  196     if (le->cmd <= 6)
  197         cmd = cmd_map[le->cmd];
  198 
  199     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
  200     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  201             "ts=%d.%d gid=%llu type=item_store key=%s status=%s cmd=%s ttl=%u clsid=%u cfd=%d\n",
  202             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
  203             keybuf, status_map[le->status], cmd, le->ttl, le->clsid, le->sfd);
  204     return total;
  205 }
  206 
  207 static int _logger_thread_parse_ige(logentry *e, char *scratch) {
  208     int total;
  209     struct logentry_item_get *le = (struct logentry_item_get *) e->data;
  210     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
  211     const char * const was_found_map[] = {
  212         "not_found", "found", "flushed", "expired" };
  213 
  214     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
  215     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  216             "ts=%d.%d gid=%llu type=item_get key=%s status=%s clsid=%u cfd=%d\n",
  217             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
  218             keybuf, was_found_map[le->was_found], le->clsid, le->sfd);
  219     return total;
  220 }
  221 
  222 static int _logger_thread_parse_ee(logentry *e, char *scratch) {
  223     int total;
  224     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
  225     struct logentry_eviction *le = (struct logentry_eviction *) e->data;
  226     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
  227     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  228             "ts=%d.%d gid=%llu type=eviction key=%s fetch=%s ttl=%lld la=%d clsid=%u\n",
  229             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
  230             keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
  231             (long long int)le->exptime, le->latime, le->clsid);
  232 
  233     return total;
  234 }
  235 #ifdef EXTSTORE
  236 static int _logger_thread_parse_extw(logentry *e, char *scratch) {
  237     int total;
  238     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
  239     struct logentry_ext_write *le = (struct logentry_ext_write *) e->data;
  240     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
  241     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  242             "ts=%d.%d gid=%llu type=extwrite key=%s fetch=%s ttl=%lld la=%d clsid=%u bucket=%u\n",
  243             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
  244             keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
  245             (long long int)le->exptime, le->latime, le->clsid, le->bucket);
  246 
  247     return total;
  248 }
  249 #endif
  250 /* Completes rendering of log line. */
  251 static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct logger_stats *ls,
  252         char *scratch, int *scratch_len) {
  253     int total = 0;
  254 
  255     switch (e->event) {
  256         case LOGGER_TEXT_ENTRY:
  257             total = snprintf(scratch, LOGGER_PARSE_SCRATCH, "ts=%d.%d gid=%llu %s\n",
  258                         (int)e->tv.tv_sec, (int)e->tv.tv_usec,
  259                         (unsigned long long) e->gid, (char *) e->data);
  260             break;
  261         case LOGGER_EVICTION_ENTRY:
  262             total = _logger_thread_parse_ee(e, scratch);
  263             break;
  264 #ifdef EXTSTORE
  265         case LOGGER_EXT_WRITE_ENTRY:
  266             total = _logger_thread_parse_extw(e, scratch);
  267             break;
  268 #endif
  269         case LOGGER_ITEM_GET_ENTRY:
  270             total = _logger_thread_parse_ige(e, scratch);
  271             break;
  272         case LOGGER_ITEM_STORE_ENTRY:
  273             total = _logger_thread_parse_ise(e, scratch);
  274             break;
  275 
  276     }
  277 
  278     if (total >= LOGGER_PARSE_SCRATCH || total <= 0) {
  279         L_DEBUG("LOGGER: Failed to flatten log entry!\n");
  280         return LOGGER_PARSE_ENTRY_FAILED;
  281     } else {
  282         *scratch_len = total;
  283     }
  284 
  285     return LOGGER_PARSE_ENTRY_OK;
  286 }
  287 
  288 /* Writes flattened entry to available watchers */
  289 static void logger_thread_write_entry(logentry *e, struct logger_stats *ls,
  290         char *scratch, int scratch_len) {
  291     int x, total;
  292     /* Write the line into available watchers with matching flags */
  293     for (x = 0; x < WATCHER_LIMIT; x++) {
  294         logger_watcher *w = watchers[x];
  295         char *skip_scr = NULL;
  296         if (w == NULL || (e->eflags & w->eflags) == 0)
  297             continue;
  298 
  299          /* Avoid poll()'ing constantly when buffer is full by resetting a
  300          * flag periodically.
  301          */
  302         while (!w->failed_flush &&
  303                 (skip_scr = (char *) bipbuf_request(w->buf, scratch_len + 128)) == NULL) {
  304             if (logger_thread_poll_watchers(0, x) <= 0) {
  305                 L_DEBUG("LOGGER: Watcher had no free space for line of size (%d)\n", scratch_len + 128);
  306                 w->failed_flush = true;
  307             }
  308         }
  309 
  310         if (w->failed_flush) {
  311             L_DEBUG("LOGGER: Fast skipped for watcher [%d] due to failed_flush\n", w->sfd);
  312             w->skipped++;
  313             ls->watcher_skipped++;
  314             continue;
  315         }
  316 
  317         if (w->skipped > 0) {
  318             total = snprintf(skip_scr, 128, "skipped=%llu\n", (unsigned long long) w->skipped);
  319             if (total >= 128 || total <= 0) {
  320                 L_DEBUG("LOGGER: Failed to flatten skipped message into watcher [%d]\n", w->sfd);
  321                 w->skipped++;
  322                 ls->watcher_skipped++;
  323                 continue;
  324             }
  325             bipbuf_push(w->buf, total);
  326             w->skipped = 0;
  327         }
  328         /* Can't fail because bipbuf_request succeeded. */
  329         bipbuf_offer(w->buf, (unsigned char *) scratch, scratch_len);
  330         ls->watcher_sent++;
  331     }
  332 }
  333 
  334 /* Called with logger stack locked.
  335  * Releases every chunk associated with a watcher and closes the connection.
  336  * We can't presently send a connection back to the worker for further
  337  * processing.
  338  */
  339 static void logger_thread_close_watcher(logger_watcher *w) {
  340     L_DEBUG("LOGGER: Closing dead watcher\n");
  341     watchers[w->id] = NULL;
  342     sidethread_conn_close(w->c);
  343     watcher_count--;
  344     bipbuf_free(w->buf);
  345     free(w);
  346     logger_set_flags();
  347 }
  348 
  349 /* Reads a particular worker thread's available bipbuf bytes. Parses each log
  350  * entry into the watcher buffers.
  351  */
  352 static int logger_thread_read(logger *l, struct logger_stats *ls) {
  353     unsigned int size;
  354     unsigned int pos = 0;
  355     unsigned char *data;
  356     char scratch[LOGGER_PARSE_SCRATCH];
  357     logentry *e;
  358     pthread_mutex_lock(&l->mutex);
  359     data = bipbuf_peek_all(l->buf, &size);
  360     pthread_mutex_unlock(&l->mutex);
  361 
  362     if (data == NULL) {
  363         return 0;
  364     }
  365     L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
  366 
  367     /* parse buffer */
  368     while (pos < size && watcher_count > 0) {
  369         enum logger_parse_entry_ret ret;
  370         int scratch_len = 0;
  371         e = (logentry *) (data + pos);
  372         ret = logger_thread_parse_entry(e, ls, scratch, &scratch_len);
  373         if (ret != LOGGER_PARSE_ENTRY_OK) {
  374             /* TODO: stats counter */
  375             fprintf(stderr, "LOGGER: Failed to parse log entry\n");
  376         } else {
  377             logger_thread_write_entry(e, ls, scratch, scratch_len);
  378         }
  379         pos += sizeof(logentry) + e->size + e->pad;
  380     }
  381     assert(pos <= size);
  382 
  383     pthread_mutex_lock(&l->mutex);
  384     data = bipbuf_poll(l->buf, size);
  385     ls->worker_written += l->written;
  386     ls->worker_dropped += l->dropped;
  387     l->written = 0;
  388     l->dropped = 0;
  389     pthread_mutex_unlock(&l->mutex);
  390     if (data == NULL) {
  391         fprintf(stderr, "LOGGER: unexpectedly couldn't advance buf pointer\n");
  392         assert(0);
  393     }
  394     return size; /* maybe the count of objects iterated? */
  395 }
  396 
  397 /* Since the event loop code isn't reusable without a refactor, and we have a
  398  * limited number of potential watchers, we run our own poll loop.
  399  * This calls poll() unnecessarily during write flushes, should be possible to
  400  * micro-optimize later.
  401  *
  402  * This flushes buffers attached to watchers, iterating through the bytes set
  403  * to each worker. Also checks for readability in case client connection was
  404  * closed.
  405  *
  406  * Allows a specific watcher to be flushed (if buf full)
  407  */
  408 static int logger_thread_poll_watchers(int force_poll, int watcher) {
  409     int x;
  410     int nfd = 0;
  411     unsigned char *data;
  412     unsigned int data_size = 0;
  413     int flushed = 0;
  414 
  415     for (x = 0; x < WATCHER_LIMIT; x++) {
  416         logger_watcher *w = watchers[x];
  417         if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
  418             continue;
  419 
  420         data = bipbuf_peek_all(w->buf, &data_size);
  421         if (data != NULL) {
  422             watchers_pollfds[nfd].fd = w->sfd;
  423             watchers_pollfds[nfd].events = POLLOUT;
  424             nfd++;
  425         } else if (force_poll) {
  426             watchers_pollfds[nfd].fd = w->sfd;
  427             watchers_pollfds[nfd].events = POLLIN;
  428             nfd++;
  429         }
  430         /* This gets set after a call to poll, and should be used to gate on
  431          * calling poll again.
  432          */
  433         w->failed_flush = false;
  434     }
  435 
  436     if (nfd == 0)
  437         return 0;
  438 
  439     //L_DEBUG("LOGGER: calling poll() [data_size: %d]\n", data_size);
  440     int ret = poll(watchers_pollfds, nfd, 0);
  441 
  442     if (ret < 0) {
  443         perror("something failed with logger thread watcher fd polling");
  444         return -1;
  445     }
  446 
  447     nfd = 0;
  448     for (x = 0; x < WATCHER_LIMIT; x++) {
  449         logger_watcher *w = watchers[x];
  450         if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
  451             continue;
  452 
  453         data_size = 0;
  454         /* Early detection of a disconnect. Otherwise we have to wait until
  455          * the next write
  456          */
  457         if (watchers_pollfds[nfd].revents & POLLIN) {
  458             char buf[1];
  459             int res = ((conn*)w->c)->read(w->c, buf, 1);
  460             if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
  461                 L_DEBUG("LOGGER: watcher closed remotely\n");
  462                 logger_thread_close_watcher(w);
  463                 nfd++;
  464                 continue;
  465             }
  466         }
  467         if ((data = bipbuf_peek_all(w->buf, &data_size)) != NULL) {
  468             if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) {
  469                 L_DEBUG("LOGGER: watcher closed during poll() call\n");
  470                 logger_thread_close_watcher(w);
  471             } else if (watchers_pollfds[nfd].revents & POLLOUT) {
  472                 int total = 0;
  473 
  474                 /* We can write a bit. */
  475                 switch (w->t) {
  476                     case LOGGER_WATCHER_STDERR:
  477                         total = fwrite(data, 1, data_size, stderr);
  478                         break;
  479                     case LOGGER_WATCHER_CLIENT:
  480                         total = ((conn*)w->c)->write(w->c, data, data_size);
  481                         break;
  482                 }
  483 
  484                 L_DEBUG("LOGGER: poll() wrote %d to %d (data_size: %d) (bipbuf_used: %d)\n", total, w->sfd,
  485                         data_size, bipbuf_used(w->buf));
  486                 if (total == -1) {
  487                     if (errno != EAGAIN && errno != EWOULDBLOCK) {
  488                         logger_thread_close_watcher(w);
  489                     }
  490                     L_DEBUG("LOGGER: watcher hit EAGAIN\n");
  491                 } else if (total == 0) {
  492                     logger_thread_close_watcher(w);
  493                 } else {
  494                     bipbuf_poll(w->buf, total);
  495                     flushed += total;
  496                 }
  497             }
  498         }
  499         nfd++;
  500     }
  501     return flushed;
  502 }
  503 
  504 static void logger_thread_sum_stats(struct logger_stats *ls) {
  505     STATS_LOCK();
  506     stats.log_worker_dropped  += ls->worker_dropped;
  507     stats.log_worker_written  += ls->worker_written;
  508     stats.log_watcher_skipped += ls->watcher_skipped;
  509     stats.log_watcher_sent    += ls->watcher_sent;
  510     STATS_UNLOCK();
  511 }
  512 
  513 #define MAX_LOGGER_SLEEP 1000000
  514 #define MIN_LOGGER_SLEEP 1000
  515 
  516 /* Primary logger thread routine */
  517 static void *logger_thread(void *arg) {
  518     useconds_t to_sleep = MIN_LOGGER_SLEEP;
  519     L_DEBUG("LOGGER: Starting logger thread\n");
  520     // TODO: If we ever have item references in the logger code, will need to
  521     // ensure everything is dequeued before stopping the thread.
  522     while (do_run_logger_thread) {
  523         int found_logs = 0;
  524         logger *l;
  525         struct logger_stats ls;
  526         memset(&ls, 0, sizeof(struct logger_stats));
  527 
  528         /* only sleep if we're *above* the minimum */
  529         if (to_sleep > MIN_LOGGER_SLEEP)
  530             usleep(to_sleep);
  531 
  532         /* Call function to iterate each logger. */
  533         pthread_mutex_lock(&logger_stack_lock);
  534         for (l = logger_stack_head; l != NULL; l=l->next) {
  535             /* lock logger, call function to manipulate it */
  536             found_logs += logger_thread_read(l, &ls);
  537         }
  538 
  539         logger_thread_poll_watchers(1, WATCHER_ALL);
  540         pthread_mutex_unlock(&logger_stack_lock);
  541 
  542         /* TODO: abstract into a function and share with lru_crawler */
  543         if (!found_logs) {
  544             if (to_sleep < MAX_LOGGER_SLEEP)
  545                 to_sleep += to_sleep / 8;
  546             if (to_sleep > MAX_LOGGER_SLEEP)
  547                 to_sleep = MAX_LOGGER_SLEEP;
  548         } else {
  549             to_sleep /= 2;
  550             if (to_sleep < MIN_LOGGER_SLEEP)
  551                 to_sleep = MIN_LOGGER_SLEEP;
  552         }
  553         logger_thread_sum_stats(&ls);
  554     }
  555 
  556     return NULL;
  557 }
  558 
  559 static int start_logger_thread(void) {
  560     int ret;
  561     do_run_logger_thread = 1;
  562     if ((ret = pthread_create(&logger_tid, NULL,
  563                               logger_thread, NULL)) != 0) {
  564         fprintf(stderr, "Can't start logger thread: %s\n", strerror(ret));
  565         return -1;
  566     }
  567     return 0;
  568 }
  569 
  570 static int stop_logger_thread(void) {
  571     do_run_logger_thread = 0;
  572     pthread_join(logger_tid, NULL);
  573     return 0;
  574 }
  575 
  576 /*************************
  577  * Public functions for submitting logs and starting loggers from workers.
  578  *************************/
  579 
  580 /* Global logger thread start/init */
  581 void logger_init(void) {
  582     /* TODO: auto destructor when threads exit */
  583     /* TODO: error handling */
  584 
  585     /* init stack for iterating loggers */
  586     logger_stack_head = 0;
  587     logger_stack_tail = 0;
  588     pthread_key_create(&logger_key, NULL);
  589 
  590     if (start_logger_thread() != 0) {
  591         abort();
  592     }
  593 
  594     /* This is what adding a STDERR watcher looks like. should replace old
  595      * "verbose" settings. */
  596     //logger_add_watcher(NULL, 0);
  597     return;
  598 }
  599 
  600 void logger_stop(void) {
  601     stop_logger_thread();
  602 }
  603 
  604 /* called *from* the thread using a logger.
  605  * initializes the per-thread bipbuf, links it into the list of loggers
  606  */
  607 logger *logger_create(void) {
  608     L_DEBUG("LOGGER: Creating and linking new logger instance\n");
  609     logger *l = calloc(1, sizeof(logger));
  610     if (l == NULL) {
  611         return NULL;
  612     }
  613 
  614     l->buf = bipbuf_new(settings.logger_buf_size);
  615     if (l->buf == NULL) {
  616         free(l);
  617         return NULL;
  618     }
  619 
  620     l->entry_map = default_entries;
  621 
  622     pthread_mutex_init(&l->mutex, NULL);
  623     pthread_setspecific(logger_key, l);
  624 
  625     /* add to list of loggers */
  626     logger_link_q(l);
  627     return l;
  628 }
  629 
  630 /* helpers for logger_log */
  631 
  632 static void _logger_log_evictions(logentry *e, item *it) {
  633     struct logentry_eviction *le = (struct logentry_eviction *) e->data;
  634     le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
  635     le->latime = current_time - it->time;
  636     le->it_flags = it->it_flags;
  637     le->nkey = it->nkey;
  638     le->clsid = ITEM_clsid(it);
  639     memcpy(le->key, ITEM_key(it), it->nkey);
  640     e->size = sizeof(struct logentry_eviction) + le->nkey;
  641 }
  642 #ifdef EXTSTORE
  643 /* TODO: When more logging endpoints are done and the extstore API has matured
  644  * more, this could be merged with above and print different types of
  645  * expulsion events.
  646  */
  647 static void _logger_log_ext_write(logentry *e, item *it, uint8_t bucket) {
  648     struct logentry_ext_write *le = (struct logentry_ext_write *) e->data;
  649     le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
  650     le->latime = current_time - it->time;
  651     le->it_flags = it->it_flags;
  652     le->nkey = it->nkey;
  653     le->clsid = ITEM_clsid(it);
  654     le->bucket = bucket;
  655     memcpy(le->key, ITEM_key(it), it->nkey);
  656     e->size = sizeof(struct logentry_ext_write) + le->nkey;
  657 }
  658 #endif
  659 /* 0 == nf, 1 == found. 2 == flushed. 3 == expired.
  660  * might be useful to store/print the flags an item has?
  661  * could also collapse this and above code into an "item status" struct. wait
  662  * for more endpoints to be written before making it generic, though.
  663  */
  664 static void _logger_log_item_get(logentry *e, const int was_found, const char *key,
  665         const int nkey, const uint8_t clsid, const int sfd) {
  666     struct logentry_item_get *le = (struct logentry_item_get *) e->data;
  667     le->was_found = was_found;
  668     le->nkey = nkey;
  669     le->clsid = clsid;
  670     memcpy(le->key, key, nkey);
  671     le->sfd = sfd;
  672     e->size = sizeof(struct logentry_item_get) + nkey;
  673 }
  674 
  675 static void _logger_log_item_store(logentry *e, const enum store_item_type status,
  676         const int comm, char *key, const int nkey, rel_time_t ttl, const uint8_t clsid, int sfd) {
  677     struct logentry_item_store *le = (struct logentry_item_store *) e->data;
  678     le->status = status;
  679     le->cmd = comm;
  680     le->nkey = nkey;
  681     le->clsid = clsid;
  682     if (ttl != 0) {
  683         le->ttl = ttl - current_time;
  684     } else {
  685         le->ttl = 0;
  686     }
  687     memcpy(le->key, key, nkey);
  688     le->sfd = sfd;
  689     e->size = sizeof(struct logentry_item_store) + nkey;
  690 }
  691 
  692 /* Public function for logging an entry.
  693  * Tries to encapsulate as much of the formatting as possible to simplify the
  694  * caller's code.
  695  */
  696 enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, const void *entry, ...) {
  697     bipbuf_t *buf = l->buf;
  698     bool nospace = false;
  699     va_list ap;
  700     int total = 0;
  701     logentry *e;
  702 
  703     const entry_details *d = &l->entry_map[event];
  704     int reqlen = d->reqlen;
  705 
  706     pthread_mutex_lock(&l->mutex);
  707     /* Request a maximum length of data to write to */
  708     e = (logentry *) bipbuf_request(buf, (sizeof(logentry) + reqlen));
  709     if (e == NULL) {
  710         pthread_mutex_unlock(&l->mutex);
  711         l->dropped++;
  712         return LOGGER_RET_NOSPACE;
  713     }
  714     e->event = d->subtype;
  715     e->pad = 0;
  716     e->gid = logger_get_gid();
  717     /* TODO: Could pass this down as an argument now that we're using
  718      * LOGGER_LOG() macro.
  719      */
  720     e->eflags = d->eflags;
  721     /* Noting time isn't optional. A feature may be added to avoid rendering
  722      * time and/or gid to a logger.
  723      */
  724     gettimeofday(&e->tv, NULL);
  725 
  726     switch (d->subtype) {
  727         case LOGGER_TEXT_ENTRY:
  728             va_start(ap, entry);
  729             total = vsnprintf((char *) e->data, reqlen, d->format, ap);
  730             va_end(ap);
  731             if (total >= reqlen || total <= 0) {
  732                 fprintf(stderr, "LOGGER: Failed to vsnprintf a text entry: (total) %d\n", total);
  733                 break;
  734             }
  735             e->size = total + 1; /* null byte */
  736 
  737             break;
  738         case LOGGER_EVICTION_ENTRY:
  739             _logger_log_evictions(e, (item *)entry);
  740             break;
  741 #ifdef EXTSTORE
  742         case LOGGER_EXT_WRITE_ENTRY:
  743             va_start(ap, entry);
  744             int ew_bucket = va_arg(ap, int);
  745             va_end(ap);
  746             _logger_log_ext_write(e, (item *)entry, ew_bucket);
  747             break;
  748 #endif
  749         case LOGGER_ITEM_GET_ENTRY:
  750             va_start(ap, entry);
  751             int was_found = va_arg(ap, int);
  752             char *key = va_arg(ap, char *);
  753             size_t nkey = va_arg(ap, size_t);
  754             uint8_t gclsid = va_arg(ap, int);
  755             int gsfd = va_arg(ap, int);
  756             _logger_log_item_get(e, was_found, key, nkey, gclsid, gsfd);
  757             va_end(ap);
  758             break;
  759         case LOGGER_ITEM_STORE_ENTRY:
  760             va_start(ap, entry);
  761             enum store_item_type status = va_arg(ap, enum store_item_type);
  762             int comm = va_arg(ap, int);
  763             char *skey = va_arg(ap, char *);
  764             size_t snkey = va_arg(ap, size_t);
  765             rel_time_t sttl = va_arg(ap, rel_time_t);
  766             uint8_t sclsid = va_arg(ap, int);
  767             int ssfd = va_arg(ap, int);
  768             _logger_log_item_store(e, status, comm, skey, snkey, sttl, sclsid, ssfd);
  769             va_end(ap);
  770             break;
  771     }
  772 
  773 #ifdef NEED_ALIGN
  774     /* Need to ensure *next* request is aligned. */
  775     if (sizeof(logentry) + e->size % 8 != 0) {
  776         e->pad = 8 - (sizeof(logentry) + e->size % 8);
  777     }
  778 #endif
  779 
  780     /* Push pointer forward by the actual amount required */
  781     if (bipbuf_push(buf, (sizeof(logentry) + e->size + e->pad)) == 0) {
  782         fprintf(stderr, "LOGGER: Failed to bipbuf push a text entry\n");
  783         pthread_mutex_unlock(&l->mutex);
  784         return LOGGER_RET_ERR;
  785     }
  786     l->written++;
  787     L_DEBUG("LOGGER: Requested %d bytes, wrote %lu bytes\n", reqlen,
  788             (sizeof(logentry) + e->size));
  789 
  790     pthread_mutex_unlock(&l->mutex);
  791 
  792     if (nospace) {
  793         return LOGGER_RET_NOSPACE;
  794     } else {
  795         return LOGGER_RET_OK;
  796     }
  797 }
  798 
  799 /* Passes a client connection socket from a primary worker thread to the
  800  * logger thread. Caller *must* event_del() the client before handing it over.
  801  * Presently there's no way to hand the client back to the worker thread.
  802  */
  803 enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t f) {
  804     int x;
  805     logger_watcher *w = NULL;
  806     pthread_mutex_lock(&logger_stack_lock);
  807     if (watcher_count >= WATCHER_LIMIT) {
  808         pthread_mutex_unlock(&logger_stack_lock);
  809         return LOGGER_ADD_WATCHER_TOO_MANY;
  810     }
  811 
  812     for (x = 0; x < WATCHER_LIMIT-1; x++) {
  813         if (watchers[x] == NULL)
  814             break;
  815     }
  816 
  817     w = calloc(1, sizeof(logger_watcher));
  818     if (w == NULL) {
  819         pthread_mutex_unlock(&logger_stack_lock);
  820         return LOGGER_ADD_WATCHER_FAILED;
  821     }
  822     w->c = c;
  823     w->sfd = sfd;
  824     if (sfd == 0 && c == NULL) {
  825         w->t = LOGGER_WATCHER_STDERR;
  826     } else {
  827         w->t = LOGGER_WATCHER_CLIENT;
  828     }
  829     w->id = x;
  830     w->eflags = f;
  831     w->buf = bipbuf_new(settings.logger_watcher_buf_size);
  832     if (w->buf == NULL) {
  833         free(w);
  834         pthread_mutex_unlock(&logger_stack_lock);
  835         return LOGGER_ADD_WATCHER_FAILED;
  836     }
  837     bipbuf_offer(w->buf, (unsigned char *) "OK\r\n", 4);
  838 
  839     watchers[x] = w;
  840     watcher_count++;
  841     /* Update what flags the global logs will watch */
  842     logger_set_flags();
  843 
  844     pthread_mutex_unlock(&logger_stack_lock);
  845     return LOGGER_ADD_WATCHER_OK;
  846 }