"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.15/logger.c" (30 Mar 2022, 33042 Bytes) of package /linux/www/memcached-1.6.15.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "logger.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.6.14_vs_1.6.15.

    1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    2 
    3 #include <arpa/inet.h>
    4 #include <stdlib.h>
    5 #include <stdio.h>
    6 #include <string.h>
    7 #include <errno.h>
    8 #include <poll.h>
    9 #include <ctype.h>
   10 #include <stdarg.h>
   11 
   12 #if defined(__sun)
   13 #include <atomic.h>
   14 #endif
   15 
   16 #include "memcached.h"
   17 #include "bipbuffer.h"
   18 
   19 #ifdef LOGGER_DEBUG
   20 #define L_DEBUG(...) \
   21     do { \
   22         fprintf(stderr, __VA_ARGS__); \
   23     } while (0)
   24 #else
   25 #define L_DEBUG(...)
   26 #endif
   27 
   28 
   29 /* TODO: put this in a struct and ditch the global vars. */
   30 static logger *logger_stack_head = NULL;
   31 static logger *logger_stack_tail = NULL;
   32 static unsigned int logger_count = 0;
   33 static volatile int do_run_logger_thread = 1;
   34 static pthread_t logger_tid;
   35 pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER;
   36 pthread_cond_t logger_stack_cond = PTHREAD_COND_INITIALIZER;
   37 
   38 pthread_key_t logger_key;
   39 
   40 #if !defined(HAVE_GCC_64ATOMICS) && !defined(__sun)
   41 pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
   42 #endif
   43 
   44 #define WATCHER_LIMIT 20
   45 logger_watcher *watchers[20];
   46 struct pollfd watchers_pollfds[20];
   47 int watcher_count = 0;
   48 
   49 #define WATCHER_ALL -1
   50 static int logger_thread_poll_watchers(int force_poll, int watcher);
   51 
   52 /* helpers for logger_log */
   53 
   54 static void _logger_log_text(logentry *e, const entry_details *d, const void *entry, va_list ap) {
   55     int reqlen = d->reqlen;
   56     int total = vsnprintf((char *) e->data, reqlen, d->format, ap);
   57     if (total <= 0) {
   58         fprintf(stderr, "LOGGER: Failed to vsnprintf a text entry: (total) %d\n", total);
   59     }
   60     e->size = total + 1; // null byte
   61 }
   62 
   63 static void _logger_log_evictions(logentry *e, const entry_details *d, const void *entry, va_list ap) {
   64     item *it = (item *)entry;
   65     struct logentry_eviction *le = (struct logentry_eviction *) e->data;
   66 
   67     le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
   68     le->latime = current_time - it->time;
   69     le->it_flags = it->it_flags;
   70     le->nkey = it->nkey;
   71     le->nbytes = it->nbytes;
   72     le->clsid = ITEM_clsid(it);
   73     memcpy(le->key, ITEM_key(it), it->nkey);
   74     e->size = sizeof(struct logentry_eviction) + le->nkey;
   75 }
   76 #ifdef EXTSTORE
   77 static void _logger_log_ext_write(logentry *e, const entry_details *d, const void *entry, va_list ap) {
   78     item *it = (item *)entry;
   79     int ew_bucket = va_arg(ap, int);
   80 
   81     struct logentry_ext_write *le = (struct logentry_ext_write *) e->data;
   82     le->exptime = (it->exptime > 0) ? (long long int)(it->exptime - current_time) : (long long int) -1;
   83     le->latime = current_time - it->time;
   84     le->it_flags = it->it_flags;
   85     le->nkey = it->nkey;
   86     le->clsid = ITEM_clsid(it);
   87     le->bucket = (uint8_t)ew_bucket;
   88     memcpy(le->key, ITEM_key(it), it->nkey);
   89     e->size = sizeof(struct logentry_ext_write) + le->nkey;
   90 }
   91 #endif
   92 // 0 == nf, 1 == found. 2 == flushed. 3 == expired.
   93 // might be useful to store/print the flags an item has?
   94 // could also collapse this and above code into an "item status" struct. wait
   95 // for more endpoints to be written before making it generic, though.
   96 static void _logger_log_item_get(logentry *e, const entry_details *d, const void *entry, va_list ap) {
   97     int was_found = va_arg(ap, int);
   98     char *key = va_arg(ap, char *);
   99     size_t nkey = va_arg(ap, size_t);
  100     int nbytes = va_arg(ap, int);
  101     uint8_t clsid = va_arg(ap, int);
  102     int sfd = va_arg(ap, int);
  103 
  104     struct logentry_item_get *le = (struct logentry_item_get *) e->data;
  105     le->was_found = was_found;
  106     le->nkey = nkey;
  107     le->nbytes = nbytes;
  108     le->clsid = clsid;
  109     memcpy(le->key, key, nkey);
  110     le->sfd = sfd;
  111     e->size = sizeof(struct logentry_item_get) + nkey;
  112 }
  113 
  114 static void _logger_log_item_store(logentry *e, const entry_details *d, const void *entry, va_list ap) {
  115     enum store_item_type status = va_arg(ap, enum store_item_type);
  116     int comm = va_arg(ap, int);
  117     char *key = va_arg(ap, char *);
  118     size_t nkey = va_arg(ap, size_t);
  119     int nbytes = va_arg(ap, int);
  120     rel_time_t ttl = va_arg(ap, rel_time_t);
  121     uint8_t clsid = va_arg(ap, int);
  122     int sfd = va_arg(ap, int);
  123 
  124     struct logentry_item_store *le = (struct logentry_item_store *) e->data;
  125     le->status = status;
  126     le->cmd = comm;
  127     le->nkey = nkey;
  128     le->nbytes = nbytes;
  129     le->clsid = clsid;
  130     if (ttl != 0) {
  131         le->ttl = ttl - current_time;
  132     } else {
  133         le->ttl = 0;
  134     }
  135     memcpy(le->key, key, nkey);
  136     le->sfd = sfd;
  137     e->size = sizeof(struct logentry_item_store) + nkey;
  138 }
  139 
  140 static void _logger_log_conn_event(logentry *e, const entry_details *d, const void *entry, va_list ap) {
  141     struct sockaddr_in6 *addr = va_arg(ap, struct sockaddr_in6 *);
  142     socklen_t addrlen = va_arg(ap, socklen_t);
  143     enum network_transport transport = va_arg(ap, enum network_transport);
  144     enum close_reasons reason = va_arg(ap, enum close_reasons);
  145     int sfd = va_arg(ap, int);
  146 
  147     struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
  148 
  149     memcpy(&le->addr, addr, addrlen);
  150     le->sfd = sfd;
  151     le->transport = transport;
  152     le->reason = reason;
  153     e->size = sizeof(struct logentry_conn_event);
  154 }
  155 
  156 /*************************
  157  * Util functions used by the logger background thread
  158  *************************/
  159 
  160 static int _logger_util_addr_endpoint(struct sockaddr_in6 *addr, char *rip,
  161         size_t riplen, unsigned short *rport) {
  162     memset(rip, 0, riplen);
  163 
  164     switch (addr->sin6_family) {
  165         case AF_INET:
  166             inet_ntop(AF_INET, &((struct sockaddr_in *) addr)->sin_addr,
  167                     rip, riplen - 1);
  168             *rport = ntohs(((struct sockaddr_in *) addr)->sin_port);
  169             break;
  170         case AF_INET6:
  171             inet_ntop(AF_INET6, &((struct sockaddr_in6 *) addr)->sin6_addr,
  172                     rip, riplen - 1);
  173             *rport = ntohs(((struct sockaddr_in6 *) addr)->sin6_port);
  174             break;
  175 #ifndef DISABLE_UNIX_SOCKET
  176         // Connections on Unix socket transports have c->request_addr zeroed out.
  177         case AF_UNSPEC:
  178         case AF_UNIX:
  179             strncpy(rip, "unix", strlen("unix") + 1);
  180             *rport = 0;
  181             break;
  182 #endif // #ifndef DISABLE_UNIX_SOCKET
  183     }
  184 
  185     return 0;
  186 }
  187 
  188 /*************************
  189  * Logger background thread functions. Aggregates per-worker buffers and
  190  * writes to any watchers.
  191  *************************/
  192 
  193 #define LOGGER_PARSE_SCRATCH 4096
  194 
  195 static int _logger_parse_text(logentry *e, char *scratch) {
  196     return snprintf(scratch, LOGGER_PARSE_SCRATCH, "ts=%d.%d gid=%llu %s\n",
  197             (int)e->tv.tv_sec, (int)e->tv.tv_usec,
  198             (unsigned long long) e->gid, (char *) e->data);
  199 }
  200 
  201 static int _logger_parse_ise(logentry *e, char *scratch) {
  202     int total;
  203     const char *cmd = "na";
  204     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
  205     struct logentry_item_store *le = (struct logentry_item_store *) e->data;
  206     const char * const status_map[] = {
  207         "not_stored", "stored", "exists", "not_found", "too_large", "no_memory" };
  208     const char * const cmd_map[] = {
  209         "null", "add", "set", "replace", "append", "prepend", "cas" };
  210 
  211     if (le->cmd <= 6)
  212         cmd = cmd_map[le->cmd];
  213 
  214     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
  215     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  216             "ts=%d.%d gid=%llu type=item_store key=%s status=%s cmd=%s ttl=%u clsid=%u cfd=%d size=%d\n",
  217             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
  218             keybuf, status_map[le->status], cmd, le->ttl, le->clsid, le->sfd,
  219             le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
  220     return total;
  221 }
  222 
  223 static int _logger_parse_ige(logentry *e, char *scratch) {
  224     int total;
  225     struct logentry_item_get *le = (struct logentry_item_get *) e->data;
  226     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
  227     const char * const was_found_map[] = {
  228         "not_found", "found", "flushed", "expired" };
  229 
  230     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
  231     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  232             "ts=%d.%d gid=%llu type=item_get key=%s status=%s clsid=%u cfd=%d size=%d\n",
  233             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
  234             keybuf, was_found_map[le->was_found], le->clsid, le->sfd,
  235             le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
  236     return total;
  237 }
  238 
  239 static int _logger_parse_ee(logentry *e, char *scratch) {
  240     int total;
  241     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
  242     struct logentry_eviction *le = (struct logentry_eviction *) e->data;
  243     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
  244     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  245             "ts=%d.%d gid=%llu type=eviction key=%s fetch=%s ttl=%lld la=%d clsid=%u size=%d\n",
  246             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
  247             keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
  248             (long long int)le->exptime, le->latime, le->clsid,
  249             le->nbytes > 0 ? le->nbytes - 2 : 0); // CLRF
  250 
  251     return total;
  252 }
  253 
  254 #ifdef EXTSTORE
  255 static int _logger_parse_extw(logentry *e, char *scratch) {
  256     int total;
  257     char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
  258     struct logentry_ext_write *le = (struct logentry_ext_write *) e->data;
  259     uriencode(le->key, keybuf, le->nkey, KEY_MAX_URI_ENCODED_LENGTH);
  260     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  261             "ts=%d.%d gid=%llu type=extwrite key=%s fetch=%s ttl=%lld la=%d clsid=%u bucket=%u\n",
  262             (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid,
  263             keybuf, (le->it_flags & ITEM_FETCHED) ? "yes" : "no",
  264             (long long int)le->exptime, le->latime, le->clsid, le->bucket);
  265 
  266     return total;
  267 }
  268 #endif
  269 
  270 static int _logger_parse_cne(logentry *e, char *scratch) {
  271     int total;
  272     unsigned short rport;
  273     char rip[64];
  274     struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
  275     const char * const transport_map[] = { "local", "tcp", "udp" };
  276 
  277     _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport);
  278 
  279     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  280             "ts=%d.%d gid=%llu type=conn_new rip=%s rport=%hu transport=%s cfd=%d\n",
  281             (int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
  282             rip, rport, transport_map[le->transport], le->sfd);
  283 
  284     return total;
  285 }
  286 
  287 static int _logger_parse_cce(logentry *e, char *scratch) {
  288     int total;
  289     unsigned short rport;
  290     char rip[64];
  291     struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
  292     const char * const transport_map[] = { "local", "tcp", "udp" };
  293     const char * const reason_map[] = { "error", "normal", "idle_timeout", "shutdown" };
  294 
  295     _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport);
  296 
  297     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  298             "ts=%d.%d gid=%llu type=conn_close rip=%s rport=%hu transport=%s reason=%s cfd=%d\n",
  299             (int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
  300             rip, rport, transport_map[le->transport],
  301             reason_map[le->reason], le->sfd);
  302 
  303     return total;
  304 }
  305 
  306 #ifdef PROXY
  307 static void _logger_log_proxy_raw(logentry *e, const entry_details *d, const void *entry, va_list ap) {
  308     struct timeval start = va_arg(ap, struct timeval);
  309     char *cmd = va_arg(ap, char *);
  310     unsigned short type = va_arg(ap, int);
  311     unsigned short code = va_arg(ap, int);
  312 
  313     struct logentry_proxy_raw *le = (void *)e->data;
  314     struct timeval end;
  315     gettimeofday(&end, NULL);
  316     le->type = type;
  317     le->code = code;
  318     le->elapsed = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
  319     memcpy(le->cmd, cmd, 9);
  320     e->size = sizeof(struct logentry_proxy_raw);
  321 }
  322 
  323 static int _logger_parse_prx_raw(logentry *e, char *scratch) {
  324     int total;
  325     struct logentry_proxy_raw *le = (void *)e->data;
  326 
  327     total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
  328             "ts=%d.%d gid=%llu type=proxy_raw elapsed=%lu cmd=%s type=%d code=%d\n",
  329             (int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
  330             le->elapsed, le->cmd, le->type, le->code);
  331     return total;
  332 }
  333 #endif
  334 
  335 /* Should this go somewhere else? */
  336 static const entry_details default_entries[] = {
  337     [LOGGER_ASCII_CMD] = {512, LOG_RAWCMDS, _logger_log_text, _logger_parse_text, "<%d %s"},
  338     [LOGGER_EVICTION] = {512, LOG_EVICTIONS, _logger_log_evictions, _logger_parse_ee, NULL},
  339     [LOGGER_ITEM_GET] = {512, LOG_FETCHERS, _logger_log_item_get, _logger_parse_ige, NULL},
  340     [LOGGER_ITEM_STORE] = {512, LOG_MUTATIONS, _logger_log_item_store, _logger_parse_ise, NULL},
  341     [LOGGER_CRAWLER_STATUS] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
  342         "type=lru_crawler crawler=%d lru=%s low_mark=%llu next_reclaims=%llu since_run=%u next_run=%d elapsed=%u examined=%llu reclaimed=%llu"
  343     },
  344     [LOGGER_SLAB_MOVE] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
  345         "type=slab_move src=%d dst=%d"
  346     },
  347     [LOGGER_CONNECTION_NEW] = {512, LOG_CONNEVENTS, _logger_log_conn_event, _logger_parse_cne, NULL},
  348     [LOGGER_CONNECTION_CLOSE] = {512, LOG_CONNEVENTS, _logger_log_conn_event, _logger_parse_cce, NULL},
  349 #ifdef EXTSTORE
  350     [LOGGER_EXTSTORE_WRITE] = {512, LOG_EVICTIONS, _logger_log_ext_write, _logger_parse_extw, NULL},
  351     [LOGGER_COMPACT_START] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
  352         "type=compact_start id=%lu version=%llu"
  353     },
  354     [LOGGER_COMPACT_ABORT] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
  355         "type=compact_abort id=%lu"
  356     },
  357     [LOGGER_COMPACT_READ_START] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
  358         "type=compact_read_start id=%lu offset=%llu"
  359     },
  360     [LOGGER_COMPACT_READ_END] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
  361         "type=compact_read_end id=%lu offset=%llu rescues=%lu lost=%lu skipped=%lu"
  362     },
  363     [LOGGER_COMPACT_END] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
  364         "type=compact_end id=%lu"
  365     },
  366     [LOGGER_COMPACT_FRAGINFO] = {512, LOG_SYSEVENTS, _logger_log_text, _logger_parse_text,
  367         "type=compact_fraginfo ratio=%.2f bytes=%lu"
  368     },
  369 #endif
  370 #ifdef PROXY
  371     [LOGGER_PROXY_CONFIG] = {512, LOG_PROXYEVENTS, _logger_log_text, _logger_parse_text,
  372         "type=proxy_conf status=%s"
  373     },
  374     [LOGGER_PROXY_RAW] = {512, LOG_PROXYCMDS, _logger_log_proxy_raw, _logger_parse_prx_raw, NULL},
  375     [LOGGER_PROXY_ERROR] = {512, LOG_PROXYEVENTS, _logger_log_text, _logger_parse_text,
  376         "type=proxy_error msg=%s"
  377     },
  378     [LOGGER_PROXY_USER] = {512, LOG_PROXYUSER, _logger_log_text, _logger_parse_text,
  379         "type=proxy_user msg=%s"
  380     },
  381     [LOGGER_PROXY_BE_ERROR] = {512, LOG_PROXYEVENTS, _logger_log_text, _logger_parse_text,
  382         "type=proxy_backend error=%s name=%s port=%s"
  383     },
  384 
  385 #endif
  386 };
  387 
  388 /*************************
  389  * Util functions shared between bg thread and workers
  390  *************************/
  391 
  392 /* Logger GID's can be used by watchers to put logs back into strict order
  393  */
  394 static uint64_t logger_gid = 0;
  395 uint64_t logger_get_gid(void) {
  396 #ifdef HAVE_GCC_64ATOMICS
  397     return __sync_add_and_fetch(&logger_gid, 1);
  398 #elif defined(__sun)
  399     return atomic_inc_64_nv(&logger_gid);
  400 #else
  401     mutex_lock(&logger_atomics_mutex);
  402     uint64_t res = ++logger_gid;
  403     mutex_unlock(&logger_atomics_mutex);
  404     return res;
  405 #endif
  406 }
  407 
  408 void logger_set_gid(uint64_t gid) {
  409 #ifdef HAVE_GCC_64ATOMICS
  410     __sync_add_and_fetch(&logger_gid, gid);
  411 #elif defined(__sun)
  412     atomic_add_64(&logger_gid);
  413 #else
  414     mutex_lock(&logger_atomics_mutex);
  415     logger_gid = gid;
  416     mutex_unlock(&logger_atomics_mutex);
  417 #endif
  418 }
  419 
  420 /* TODO: genericize lists. would be nice to import queue.h if the impact is
  421  * studied... otherwise can just write a local one.
  422  */
  423 /* Add to the list of threads with a logger object */
  424 static void logger_link_q(logger *l) {
  425     pthread_mutex_lock(&logger_stack_lock);
  426     assert(l != logger_stack_head);
  427 
  428     l->prev = 0;
  429     l->next = logger_stack_head;
  430     if (l->next) l->next->prev = l;
  431     logger_stack_head = l;
  432     if (logger_stack_tail == 0) logger_stack_tail = l;
  433     logger_count++;
  434     pthread_mutex_unlock(&logger_stack_lock);
  435     return;
  436 }
  437 
  438 /* Remove from the list of threads with a logger object */
  439 /*static void logger_unlink_q(logger *l) {
  440     pthread_mutex_lock(&logger_stack_lock);
  441     if (logger_stack_head == l) {
  442         assert(l->prev == 0);
  443         logger_stack_head = l->next;
  444     }
  445     if (logger_stack_tail == l) {
  446         assert(l->next == 0);
  447         logger_stack_tail = l->prev;
  448     }
  449     assert(l->next != l);
  450     assert(l->prev != l);
  451 
  452     if (l->next) l->next->prev = l->prev;
  453     if (l->prev) l->prev->next = l->next;
  454     logger_count--;
  455     pthread_mutex_unlock(&logger_stack_lock);
  456     return;
  457 }*/
  458 
  459 /* Called with logger stack locked.
  460  * Iterates over every watcher collecting enabled flags.
  461  */
  462 static void logger_set_flags(void) {
  463     logger *l = NULL;
  464     int x = 0;
  465     uint16_t f = 0; /* logger eflags */
  466 
  467     for (x = 0; x < WATCHER_LIMIT; x++) {
  468         logger_watcher *w = watchers[x];
  469         if (w == NULL)
  470             continue;
  471 
  472         f |= w->eflags;
  473     }
  474     for (l = logger_stack_head; l != NULL; l=l->next) {
  475         pthread_mutex_lock(&l->mutex);
  476         l->eflags = f;
  477         pthread_mutex_unlock(&l->mutex);
  478     }
  479     return;
  480 }
  481 
  482 /* Completes rendering of log line. */
  483 static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct logger_stats *ls,
  484         char *scratch, int *scratch_len) {
  485     int total = 0;
  486     const entry_details *d = &default_entries[e->event];
  487     assert(d->parse_cb != NULL);
  488     total = d->parse_cb(e, scratch);
  489 
  490     if (total >= LOGGER_PARSE_SCRATCH || total <= 0) {
  491         L_DEBUG("LOGGER: Failed to flatten log entry!\n");
  492         return LOGGER_PARSE_ENTRY_FAILED;
  493     } else {
  494         *scratch_len = total;
  495     }
  496 
  497     return LOGGER_PARSE_ENTRY_OK;
  498 }
  499 
  500 /* Writes flattened entry to available watchers */
  501 static void logger_thread_write_entry(logentry *e, struct logger_stats *ls,
  502         char *scratch, int scratch_len) {
  503     int x, total;
  504     /* Write the line into available watchers with matching flags */
  505     for (x = 0; x < WATCHER_LIMIT; x++) {
  506         logger_watcher *w = watchers[x];
  507         char *skip_scr = NULL;
  508         if (w == NULL || (e->eflags & w->eflags) == 0 || (e->gid < w->min_gid))
  509             continue;
  510 
  511          /* Avoid poll()'ing constantly when buffer is full by resetting a
  512          * flag periodically.
  513          */
  514         while (!w->failed_flush &&
  515                 (skip_scr = (char *) bipbuf_request(w->buf, scratch_len + 128)) == NULL) {
  516             if (logger_thread_poll_watchers(0, x) <= 0) {
  517                 L_DEBUG("LOGGER: Watcher had no free space for line of size (%d)\n", scratch_len + 128);
  518                 w->failed_flush = true;
  519             }
  520         }
  521 
  522         if (w->failed_flush) {
  523             L_DEBUG("LOGGER: Fast skipped for watcher [%d] due to failed_flush\n", w->sfd);
  524             w->skipped++;
  525             ls->watcher_skipped++;
  526             continue;
  527         }
  528 
  529         if (w->skipped > 0) {
  530             total = snprintf(skip_scr, 128, "skipped=%llu\n", (unsigned long long) w->skipped);
  531             if (total >= 128 || total <= 0) {
  532                 L_DEBUG("LOGGER: Failed to flatten skipped message into watcher [%d]\n", w->sfd);
  533                 w->skipped++;
  534                 ls->watcher_skipped++;
  535                 continue;
  536             }
  537             bipbuf_push(w->buf, total);
  538             w->skipped = 0;
  539         }
  540         /* Can't fail because bipbuf_request succeeded. */
  541         bipbuf_offer(w->buf, (unsigned char *) scratch, scratch_len);
  542         ls->watcher_sent++;
  543     }
  544 }
  545 
  546 /* Called with logger stack locked.
  547  * Releases every chunk associated with a watcher and closes the connection.
  548  * We can't presently send a connection back to the worker for further
  549  * processing.
  550  */
  551 static void logger_thread_close_watcher(logger_watcher *w) {
  552     L_DEBUG("LOGGER: Closing dead watcher\n");
  553     watchers[w->id] = NULL;
  554     sidethread_conn_close(w->c);
  555     watcher_count--;
  556     bipbuf_free(w->buf);
  557     free(w);
  558     logger_set_flags();
  559 }
  560 
  561 /* Reads a particular worker thread's available bipbuf bytes. Parses each log
  562  * entry into the watcher buffers.
  563  */
  564 static int logger_thread_read(logger *l, struct logger_stats *ls) {
  565     unsigned int size;
  566     unsigned int pos = 0;
  567     unsigned char *data;
  568     char scratch[LOGGER_PARSE_SCRATCH];
  569     logentry *e;
  570     pthread_mutex_lock(&l->mutex);
  571     data = bipbuf_peek_all(l->buf, &size);
  572     pthread_mutex_unlock(&l->mutex);
  573 
  574     if (data == NULL) {
  575         return 0;
  576     }
  577     L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
  578 
  579     /* parse buffer */
  580     while (pos < size && watcher_count > 0) {
  581         enum logger_parse_entry_ret ret;
  582         int scratch_len = 0;
  583         e = (logentry *) (data + pos);
  584         ret = logger_thread_parse_entry(e, ls, scratch, &scratch_len);
  585         if (ret != LOGGER_PARSE_ENTRY_OK) {
  586             /* TODO: stats counter */
  587             fprintf(stderr, "LOGGER: Failed to parse log entry\n");
  588         } else {
  589             logger_thread_write_entry(e, ls, scratch, scratch_len);
  590         }
  591         pos += sizeof(logentry) + e->size + e->pad;
  592     }
  593     assert(pos <= size);
  594 
  595     pthread_mutex_lock(&l->mutex);
  596     data = bipbuf_poll(l->buf, size);
  597     ls->worker_written += l->written;
  598     ls->worker_dropped += l->dropped;
  599     l->written = 0;
  600     l->dropped = 0;
  601     pthread_mutex_unlock(&l->mutex);
  602     if (data == NULL) {
  603         fprintf(stderr, "LOGGER: unexpectedly couldn't advance buf pointer\n");
  604         assert(0);
  605     }
  606     return size; /* maybe the count of objects iterated? */
  607 }
  608 
  609 /* Since the event loop code isn't reusable without a refactor, and we have a
  610  * limited number of potential watchers, we run our own poll loop.
  611  * This calls poll() unnecessarily during write flushes, should be possible to
  612  * micro-optimize later.
  613  *
  614  * This flushes buffers attached to watchers, iterating through the bytes set
  615  * to each worker. Also checks for readability in case client connection was
  616  * closed.
  617  *
  618  * Allows a specific watcher to be flushed (if buf full)
  619  */
  620 static int logger_thread_poll_watchers(int force_poll, int watcher) {
  621     int x;
  622     int nfd = 0;
  623     unsigned char *data;
  624     unsigned int data_size = 0;
  625     int flushed = 0;
  626 
  627     for (x = 0; x < WATCHER_LIMIT; x++) {
  628         logger_watcher *w = watchers[x];
  629         if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
  630             continue;
  631 
  632         data = bipbuf_peek_all(w->buf, &data_size);
  633         if (data != NULL) {
  634             watchers_pollfds[nfd].fd = w->sfd;
  635             watchers_pollfds[nfd].events = POLLOUT;
  636             nfd++;
  637         } else if (force_poll) {
  638             watchers_pollfds[nfd].fd = w->sfd;
  639             watchers_pollfds[nfd].events = POLLIN;
  640             nfd++;
  641         }
  642         /* This gets set after a call to poll, and should be used to gate on
  643          * calling poll again.
  644          */
  645         w->failed_flush = false;
  646     }
  647 
  648     if (nfd == 0)
  649         return 0;
  650 
  651     //L_DEBUG("LOGGER: calling poll() [data_size: %d]\n", data_size);
  652     int ret = poll(watchers_pollfds, nfd, 0);
  653 
  654     if (ret < 0) {
  655         perror("something failed with logger thread watcher fd polling");
  656         return -1;
  657     }
  658 
  659     nfd = 0;
  660     for (x = 0; x < WATCHER_LIMIT; x++) {
  661         logger_watcher *w = watchers[x];
  662         if (w == NULL || (watcher != WATCHER_ALL && x != watcher))
  663             continue;
  664 
  665         data_size = 0;
  666         /* Early detection of a disconnect. Otherwise we have to wait until
  667          * the next write
  668          */
  669         if (watchers_pollfds[nfd].revents & POLLIN) {
  670             char buf[1];
  671             int res = ((conn*)w->c)->read(w->c, buf, 1);
  672             if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
  673                 L_DEBUG("LOGGER: watcher closed remotely\n");
  674                 logger_thread_close_watcher(w);
  675                 nfd++;
  676                 continue;
  677             }
  678         }
  679         if ((data = bipbuf_peek_all(w->buf, &data_size)) != NULL) {
  680             if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) {
  681                 L_DEBUG("LOGGER: watcher closed during poll() call\n");
  682                 logger_thread_close_watcher(w);
  683             } else if (watchers_pollfds[nfd].revents & POLLOUT) {
  684                 int total = 0;
  685 
  686                 /* We can write a bit. */
  687                 switch (w->t) {
  688                     case LOGGER_WATCHER_STDERR:
  689                         total = fwrite(data, 1, data_size, stderr);
  690                         break;
  691                     case LOGGER_WATCHER_CLIENT:
  692                         total = ((conn*)w->c)->write(w->c, data, data_size);
  693                         break;
  694                 }
  695 
  696                 L_DEBUG("LOGGER: poll() wrote %d to %d (data_size: %d) (bipbuf_used: %d)\n", total, w->sfd,
  697                         data_size, bipbuf_used(w->buf));
  698                 if (total == -1) {
  699                     if (errno != EAGAIN && errno != EWOULDBLOCK) {
  700                         logger_thread_close_watcher(w);
  701                     }
  702                     L_DEBUG("LOGGER: watcher hit EAGAIN\n");
  703                 } else if (total == 0) {
  704                     logger_thread_close_watcher(w);
  705                 } else {
  706                     bipbuf_poll(w->buf, total);
  707                     flushed += total;
  708                 }
  709             }
  710         }
  711         nfd++;
  712     }
  713     return flushed;
  714 }
  715 
  716 static void logger_thread_flush_stats(struct logger_stats *ls) {
  717     STATS_LOCK();
  718     stats.log_worker_dropped  += ls->worker_dropped;
  719     stats.log_worker_written  += ls->worker_written;
  720     stats.log_watcher_skipped += ls->watcher_skipped;
  721     stats.log_watcher_sent    += ls->watcher_sent;
  722     stats_state.log_watchers   = ls->watcher_count;
  723     STATS_UNLOCK();
  724 }
  725 
  726 #define MAX_LOGGER_SLEEP 1000000
  727 #define MIN_LOGGER_SLEEP 1000
  728 
  729 /* Primary logger thread routine */
  730 static void *logger_thread(void *arg) {
  731     useconds_t to_sleep = MIN_LOGGER_SLEEP;
  732     L_DEBUG("LOGGER: Starting logger thread\n");
  733     // TODO: If we ever have item references in the logger code, will need to
  734     // ensure everything is dequeued before stopping the thread.
  735     while (do_run_logger_thread) {
  736         int found_logs = 0;
  737         logger *l;
  738         struct logger_stats ls;
  739         memset(&ls, 0, sizeof(struct logger_stats));
  740 
  741         /* only sleep if we're *above* the minimum */
  742         if (to_sleep > MIN_LOGGER_SLEEP)
  743             usleep(to_sleep);
  744 
  745         /* Call function to iterate each logger. */
  746         pthread_mutex_lock(&logger_stack_lock);
  747         if (watcher_count == 0) {
  748             // Not bothering to loop on the condition here since it's fine to
  749             // walk through with zero watchers.
  750             pthread_cond_wait(&logger_stack_cond, &logger_stack_lock);
  751         }
  752         for (l = logger_stack_head; l != NULL; l=l->next) {
  753             /* lock logger, call function to manipulate it */
  754             found_logs += logger_thread_read(l, &ls);
  755         }
  756 
  757         logger_thread_poll_watchers(1, WATCHER_ALL);
  758 
  759         /* capture the current count within mutual exclusion of the lock */
  760         ls.watcher_count = watcher_count;
  761 
  762         pthread_mutex_unlock(&logger_stack_lock);
  763 
  764         /* TODO: abstract into a function and share with lru_crawler */
  765         if (!found_logs) {
  766             if (to_sleep < MAX_LOGGER_SLEEP)
  767                 to_sleep += to_sleep / 8;
  768             if (to_sleep > MAX_LOGGER_SLEEP)
  769                 to_sleep = MAX_LOGGER_SLEEP;
  770         } else {
  771             to_sleep /= 2;
  772             if (to_sleep < MIN_LOGGER_SLEEP)
  773                 to_sleep = MIN_LOGGER_SLEEP;
  774         }
  775         logger_thread_flush_stats(&ls);
  776     }
  777 
  778     return NULL;
  779 }
  780 
  781 static int start_logger_thread(void) {
  782     int ret;
  783     do_run_logger_thread = 1;
  784     if ((ret = pthread_create(&logger_tid, NULL,
  785                               logger_thread, NULL)) != 0) {
  786         fprintf(stderr, "Can't start logger thread: %s\n", strerror(ret));
  787         return -1;
  788     }
  789     return 0;
  790 }
  791 
  792 static int stop_logger_thread(void) {
  793     do_run_logger_thread = 0;
  794     pthread_cond_signal(&logger_stack_cond);
  795     pthread_join(logger_tid, NULL);
  796     return 0;
  797 }
  798 
  799 /*************************
  800  * Public functions for submitting logs and starting loggers from workers.
  801  *************************/
  802 
  803 /* Global logger thread start/init */
  804 void logger_init(void) {
  805     /* TODO: auto destructor when threads exit */
  806     /* TODO: error handling */
  807 
  808     /* init stack for iterating loggers */
  809     logger_stack_head = 0;
  810     logger_stack_tail = 0;
  811     pthread_key_create(&logger_key, NULL);
  812 
  813     if (start_logger_thread() != 0) {
  814         abort();
  815     }
  816 
  817     /* This is what adding a STDERR watcher looks like. should replace old
  818      * "verbose" settings. */
  819     //logger_add_watcher(NULL, 0);
  820     return;
  821 }
  822 
  823 void logger_stop(void) {
  824     stop_logger_thread();
  825 }
  826 
  827 /* called *from* the thread using a logger.
  828  * initializes the per-thread bipbuf, links it into the list of loggers
  829  */
  830 logger *logger_create(void) {
  831     L_DEBUG("LOGGER: Creating and linking new logger instance\n");
  832     logger *l = calloc(1, sizeof(logger));
  833     if (l == NULL) {
  834         return NULL;
  835     }
  836 
  837     l->buf = bipbuf_new(settings.logger_buf_size);
  838     if (l->buf == NULL) {
  839         free(l);
  840         return NULL;
  841     }
  842 
  843     l->entry_map = default_entries;
  844 
  845     pthread_mutex_init(&l->mutex, NULL);
  846     pthread_setspecific(logger_key, l);
  847 
  848     /* add to list of loggers */
  849     logger_link_q(l);
  850     return l;
  851 }
  852 
  853 /* Public function for logging an entry.
  854  * Tries to encapsulate as much of the formatting as possible to simplify the
  855  * caller's code.
  856  */
  857 enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, const void *entry, ...) {
  858     bipbuf_t *buf = l->buf;
  859     bool nospace = false;
  860     va_list ap;
  861     logentry *e;
  862 
  863     const entry_details *d = &l->entry_map[event];
  864     int reqlen = d->reqlen;
  865 
  866     pthread_mutex_lock(&l->mutex);
  867     /* Request a maximum length of data to write to */
  868     e = (logentry *) bipbuf_request(buf, (sizeof(logentry) + reqlen));
  869     if (e == NULL) {
  870         pthread_mutex_unlock(&l->mutex);
  871         l->dropped++;
  872         return LOGGER_RET_NOSPACE;
  873     }
  874     e->event = event;
  875     e->pad = 0;
  876     e->gid = logger_get_gid();
  877     /* TODO: Could pass this down as an argument now that we're using
  878      * LOGGER_LOG() macro.
  879      */
  880     e->eflags = d->eflags;
  881     /* Noting time isn't optional. A feature may be added to avoid rendering
  882      * time and/or gid to a logger.
  883      */
  884     gettimeofday(&e->tv, NULL);
  885 
  886     va_start(ap, entry);
  887     d->log_cb(e, d, entry, ap);
  888     va_end(ap);
  889 
  890 #ifdef NEED_ALIGN
  891     /* Need to ensure *next* request is aligned. */
  892     if (sizeof(logentry) + e->size % 8 != 0) {
  893         e->pad = 8 - (sizeof(logentry) + e->size % 8);
  894     }
  895 #endif
  896 
  897     /* Push pointer forward by the actual amount required */
  898     if (bipbuf_push(buf, (sizeof(logentry) + e->size + e->pad)) == 0) {
  899         fprintf(stderr, "LOGGER: Failed to bipbuf push a text entry\n");
  900         pthread_mutex_unlock(&l->mutex);
  901         return LOGGER_RET_ERR;
  902     }
  903     l->written++;
  904     L_DEBUG("LOGGER: Requested %d bytes, wrote %lu bytes\n", reqlen,
  905             (sizeof(logentry) + e->size));
  906 
  907     pthread_mutex_unlock(&l->mutex);
  908 
  909     if (nospace) {
  910         return LOGGER_RET_NOSPACE;
  911     } else {
  912         return LOGGER_RET_OK;
  913     }
  914 }
  915 
  916 /* Passes a client connection socket from a primary worker thread to the
  917  * logger thread. Caller *must* event_del() the client before handing it over.
  918  * Presently there's no way to hand the client back to the worker thread.
  919  */
  920 enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t f) {
  921     int x;
  922     logger_watcher *w = NULL;
  923     pthread_mutex_lock(&logger_stack_lock);
  924     if (watcher_count >= WATCHER_LIMIT) {
  925         pthread_mutex_unlock(&logger_stack_lock);
  926         return LOGGER_ADD_WATCHER_TOO_MANY;
  927     }
  928 
  929     for (x = 0; x < WATCHER_LIMIT-1; x++) {
  930         if (watchers[x] == NULL)
  931             break;
  932     }
  933 
  934     w = calloc(1, sizeof(logger_watcher));
  935     if (w == NULL) {
  936         pthread_mutex_unlock(&logger_stack_lock);
  937         return LOGGER_ADD_WATCHER_FAILED;
  938     }
  939     w->c = c;
  940     w->sfd = sfd;
  941     if (sfd == 0 && c == NULL) {
  942         w->t = LOGGER_WATCHER_STDERR;
  943     } else {
  944         w->t = LOGGER_WATCHER_CLIENT;
  945     }
  946     w->id = x;
  947     w->eflags = f;
  948     w->min_gid = logger_get_gid();
  949     w->buf = bipbuf_new(settings.logger_watcher_buf_size);
  950     if (w->buf == NULL) {
  951         free(w);
  952         pthread_mutex_unlock(&logger_stack_lock);
  953         return LOGGER_ADD_WATCHER_FAILED;
  954     }
  955     bipbuf_offer(w->buf, (unsigned char *) "OK\r\n", 4);
  956 
  957     watchers[x] = w;
  958     watcher_count++;
  959     /* Update what flags the global logs will watch */
  960     logger_set_flags();
  961     pthread_cond_signal(&logger_stack_cond);
  962 
  963     pthread_mutex_unlock(&logger_stack_lock);
  964     return LOGGER_ADD_WATCHER_OK;
  965 }