"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.15/proto_proxy.c" (30 Mar 2022, 33141 Bytes) of package /linux/www/memcached-1.6.15.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "proto_proxy.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.6.14_vs_1.6.15.

    1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
    2 /*
    3  * Functions for handling the proxy layer. wraps text protocols
    4  *
    5  * NOTE: many lua functions generate pointers via "lua_newuserdatauv" or
    6  * similar. Normal memory checking isn't done as lua will throw a high level
    7  * error if malloc fails. Must keep this in mind while allocating data so any
    8  * manually malloc'ed information gets freed properly.
    9  */
   10 
   11 #include "proxy.h"
   12 
   13 #define PROCESS_MULTIGET true
   14 #define PROCESS_NORMAL false
   15 static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget);
   16 static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
   17 static void proxy_out_errstring(mc_resp *resp, const char *str);
   18 
   19 /******** EXTERNAL FUNCTIONS ******/
   20 // functions starting with _ are breakouts for the public functions.
   21 
   22 // see also: process_extstore_stats()
   23 // FIXME (v2): get context off of conn? global variables
   24 void proxy_stats(ADD_STAT add_stats, conn *c) {
   25     if (!settings.proxy_enabled) {
   26        return;
   27     }
   28     proxy_ctx_t *ctx = settings.proxy_ctx;
   29     STAT_L(ctx);
   30 
   31     APPEND_STAT("proxy_config_reloads", "%llu", (unsigned long long)ctx->global_stats.config_reloads);
   32     APPEND_STAT("proxy_config_reload_fails", "%llu", (unsigned long long)ctx->global_stats.config_reload_fails);
   33     APPEND_STAT("proxy_backend_total", "%llu", (unsigned long long)ctx->global_stats.backend_total);
   34     APPEND_STAT("proxy_backend_marked_bad", "%llu", (unsigned long long)ctx->global_stats.backend_marked_bad);
   35     APPEND_STAT("proxy_backend_failed", "%llu", (unsigned long long)ctx->global_stats.backend_failed);
   36     STAT_UL(ctx);
   37 }
   38 
   39 void process_proxy_stats(ADD_STAT add_stats, conn *c) {
   40     char key_str[STAT_KEY_LEN];
   41     struct proxy_int_stats istats = {0};
   42 
   43     if (!settings.proxy_enabled) {
   44         return;
   45     }
   46     proxy_ctx_t *ctx = settings.proxy_ctx;
   47     STAT_L(ctx);
   48 
   49     // prepare aggregated counters.
   50     struct proxy_user_stats *us = &ctx->user_stats;
   51     uint64_t counters[us->num_stats];
   52     memset(counters, 0, sizeof(counters));
   53 
   54     // aggregate worker thread counters.
   55     for (int x = 0; x < settings.num_threads; x++) {
   56         LIBEVENT_THREAD *t = get_worker_thread(x);
   57         struct proxy_user_stats *tus = t->proxy_user_stats;
   58         struct proxy_int_stats *is = t->proxy_int_stats;
   59         WSTAT_L(t);
   60         for (int i = 0; i < CMD_FINAL; i++) {
   61             istats.counters[i] += is->counters[i];
   62         }
   63         if (tus && tus->num_stats >= us->num_stats) {
   64             for (int i = 0; i < us->num_stats; i++) {
   65                 counters[i] += tus->counters[i];
   66             }
   67         }
   68         WSTAT_UL(t);
   69     }
   70 
   71     // return all of the user generated stats
   72     for (int x = 0; x < us->num_stats; x++) {
   73         snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]);
   74         APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]);
   75     }
   76     STAT_UL(ctx);
   77 
   78     // return proxy counters
   79     APPEND_STAT("cmd_mg", "%llu", (unsigned long long)istats.counters[CMD_MG]);
   80     APPEND_STAT("cmd_ms", "%llu", (unsigned long long)istats.counters[CMD_MS]);
   81     APPEND_STAT("cmd_md", "%llu", (unsigned long long)istats.counters[CMD_MD]);
   82     APPEND_STAT("cmd_mn", "%llu", (unsigned long long)istats.counters[CMD_MN]);
   83     APPEND_STAT("cmd_ma", "%llu", (unsigned long long)istats.counters[CMD_MA]);
   84     APPEND_STAT("cmd_me", "%llu", (unsigned long long)istats.counters[CMD_ME]);
   85     APPEND_STAT("cmd_get", "%llu", (unsigned long long)istats.counters[CMD_GET]);
   86     APPEND_STAT("cmd_gat", "%llu", (unsigned long long)istats.counters[CMD_GAT]);
   87     APPEND_STAT("cmd_set", "%llu", (unsigned long long)istats.counters[CMD_SET]);
   88     APPEND_STAT("cmd_add", "%llu", (unsigned long long)istats.counters[CMD_ADD]);
   89     APPEND_STAT("cmd_cas", "%llu", (unsigned long long)istats.counters[CMD_CAS]);
   90     APPEND_STAT("cmd_gets", "%llu", (unsigned long long)istats.counters[CMD_GETS]);
   91     APPEND_STAT("cmd_gats", "%llu", (unsigned long long)istats.counters[CMD_GATS]);
   92     APPEND_STAT("cmd_incr", "%llu", (unsigned long long)istats.counters[CMD_INCR]);
   93     APPEND_STAT("cmd_decr", "%llu", (unsigned long long)istats.counters[CMD_DECR]);
   94     APPEND_STAT("cmd_touch", "%llu", (unsigned long long)istats.counters[CMD_TOUCH]);
   95     APPEND_STAT("cmd_append", "%llu", (unsigned long long)istats.counters[CMD_APPEND]);
   96     APPEND_STAT("cmd_prepend", "%llu", (unsigned long long)istats.counters[CMD_PREPEND]);
   97     APPEND_STAT("cmd_delete", "%llu", (unsigned long long)istats.counters[CMD_DELETE]);
   98     APPEND_STAT("cmd_replace", "%llu", (unsigned long long)istats.counters[CMD_REPLACE]);
   99 }
  100 
  101 // start the centralized lua state and config thread.
  102 // TODO (v2): return ctx ptr. avoid global vars.
  103 void proxy_init(bool use_uring) {
  104     proxy_ctx_t *ctx = calloc(1, sizeof(proxy_ctx_t));
  105     settings.proxy_ctx = ctx;
  106     ctx->use_uring = use_uring;
  107 
  108     pthread_mutex_init(&ctx->config_lock, NULL);
  109     pthread_cond_init(&ctx->config_cond, NULL);
  110     pthread_mutex_init(&ctx->worker_lock, NULL);
  111     pthread_cond_init(&ctx->worker_cond, NULL);
  112     pthread_mutex_init(&ctx->manager_lock, NULL);
  113     pthread_cond_init(&ctx->manager_cond, NULL);
  114     pthread_mutex_init(&ctx->stats_lock, NULL);
  115 
  116     // FIXME (v2): default defines.
  117     ctx->tunables.tcp_keepalive = false;
  118     ctx->tunables.backend_failure_limit = 3;
  119     ctx->tunables.connect.tv_sec = 5;
  120     ctx->tunables.retry.tv_sec = 3;
  121     ctx->tunables.read.tv_sec = 3;
  122 #ifdef HAVE_LIBURING
  123     ctx->tunables.connect_ur.tv_sec = 5;
  124     ctx->tunables.retry_ur.tv_sec = 3;
  125     ctx->tunables.read_ur.tv_sec = 3;
  126 #endif // HAVE_LIBURING
  127 
  128     STAILQ_INIT(&ctx->manager_head);
  129     lua_State *L = luaL_newstate();
  130     ctx->proxy_state = L;
  131     luaL_openlibs(L);
  132     // NOTE: might need to differentiate the libs yes?
  133     proxy_register_libs(NULL, L);
  134 
  135     // Create/start the backend threads, which we need before servers
  136     // start getting created.
  137     // Supporting N event threads should be possible, but it will be a
  138     // low number of N to avoid too many wakeup syscalls.
  139     // For now we hardcode to 1.
  140     proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t));
  141     ctx->proxy_threads = threads;
  142     for (int i = 0; i < 1; i++) {
  143         proxy_event_thread_t *t = &threads[i];
  144         t->ctx = ctx;
  145 #ifdef USE_EVENTFD
  146         t->event_fd = eventfd(0, EFD_NONBLOCK);
  147         if (t->event_fd == -1) {
  148             perror("failed to create backend notify eventfd");
  149             exit(1);
  150         }
  151 #else
  152         int fds[2];
  153         if (pipe(fds)) {
  154             perror("can't create proxy backend notify pipe");
  155             exit(1);
  156         }
  157 
  158         t->notify_receive_fd = fds[0];
  159         t->notify_send_fd = fds[1];
  160 #endif
  161         proxy_init_evthread_events(t);
  162 
  163         // incoming request queue.
  164         STAILQ_INIT(&t->io_head_in);
  165         pthread_mutex_init(&t->mutex, NULL);
  166         pthread_cond_init(&t->cond, NULL);
  167 
  168         memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
  169 
  170 #ifdef HAVE_LIBURING
  171         if (t->use_uring) {
  172             pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
  173         } else {
  174             pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
  175         }
  176 #else
  177         pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
  178 #endif // HAVE_LIBURING
  179     }
  180 
  181     _start_proxy_config_threads(ctx);
  182 }
  183 
  184 // Initialize the VM for an individual worker thread.
  185 void proxy_thread_init(LIBEVENT_THREAD *thr) {
  186     // Create the hook table.
  187     thr->proxy_hooks = calloc(CMD_SIZE, sizeof(struct proxy_hook));
  188     if (thr->proxy_hooks == NULL) {
  189         fprintf(stderr, "Failed to allocate proxy hooks\n");
  190         exit(EXIT_FAILURE);
  191     }
  192     thr->proxy_int_stats = calloc(1, sizeof(struct proxy_int_stats));
  193     if (thr->proxy_int_stats == NULL) {
  194         fprintf(stderr, "Failed to allocate proxy thread stats\n");
  195         exit(EXIT_FAILURE);
  196     }
  197 
  198     // Initialize the lua state.
  199     lua_State *L = luaL_newstate();
  200     thr->L = L;
  201     luaL_openlibs(L);
  202     proxy_register_libs(thr, L);
  203 
  204     // kick off the configuration.
  205     if (proxy_thread_loadconf(thr) != 0) {
  206         exit(EXIT_FAILURE);
  207     }
  208 }
  209 
  210 // ctx_stack is a stack of io_pending_proxy_t's.
  211 void proxy_submit_cb(io_queue_t *q) {
  212     proxy_event_thread_t *e = ((proxy_ctx_t *)q->ctx)->proxy_threads;
  213     io_pending_proxy_t *p = q->stack_ctx;
  214     io_head_t head;
  215     STAILQ_INIT(&head);
  216 
  217     // NOTE: responses get returned in the correct order no matter what, since
  218     // mc_resp's are linked.
  219     // we just need to ensure stuff is parsed off the backend in the correct
  220     // order.
  221     // So we can do with a single list here, but we need to repair the list as
  222     // responses are parsed. (in the req_remaining-- section)
  223     // TODO (v2):
  224     // - except we can't do that because the deferred IO stack isn't
  225     // compatible with queue.h.
  226     // So for now we build the secondary list with an STAILQ, which
  227     // can be transplanted/etc.
  228     while (p) {
  229         // insert into tail so head is oldest request.
  230         STAILQ_INSERT_TAIL(&head, p, io_next);
  231         if (p->is_await) {
  232             // need to not count await objects multiple times.
  233             if (p->await_first) {
  234                 q->count++;
  235             }
  236             // funny workaround: awaiting IOP's don't count toward
  237             // resuming a connection, only the completion of the await
  238             // condition.
  239         } else {
  240             q->count++;
  241         }
  242 
  243         p = p->next;
  244     }
  245 
  246     // clear out the submit queue so we can re-queue new IO's inline.
  247     q->stack_ctx = NULL;
  248 
  249     // Transfer request stack to event thread.
  250     pthread_mutex_lock(&e->mutex);
  251     STAILQ_CONCAT(&e->io_head_in, &head);
  252     // No point in holding the lock since we're not doing a cond signal.
  253     pthread_mutex_unlock(&e->mutex);
  254 
  255     // Signal to check queue.
  256 #ifdef USE_EVENTFD
  257     uint64_t u = 1;
  258     // TODO (v2): check result? is it ever possible to get a short write/failure
  259     // for an eventfd?
  260     if (write(e->event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
  261         assert(1 == 0);
  262     }
  263 #else
  264     if (write(e->notify_send_fd, "w", 1) <= 0) {
  265         assert(1 == 0);
  266     }
  267 #endif
  268 
  269     return;
  270 }
  271 
  272 void proxy_complete_cb(io_queue_t *q) {
  273     // empty/unused.
  274 }
  275 
  276 // called from worker thread after an individual IO has been returned back to
  277 // the worker thread. Do post-IO run and cleanup work.
  278 void proxy_return_cb(io_pending_t *pending) {
  279     io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
  280     if (p->is_await) {
  281         mcplib_await_return(p);
  282     } else {
  283         lua_State *Lc = p->coro;
  284 
  285         // in order to resume we need to remove the objects that were
  286         // originally returned
  287         // what's currently on the top of the stack is what we want to keep.
  288         lua_rotate(Lc, 1, 1);
  289         // We kept the original results from the yield so lua would not
  290         // collect them in the meantime. We can drop those now.
  291         lua_settop(Lc, 1);
  292 
  293         // p can be freed/changed from the call below, so fetch the queue now.
  294         io_queue_t *q = conn_io_queue_get(p->c, p->io_queue_type);
  295         conn *c = p->c;
  296         proxy_run_coroutine(Lc, p->resp, p, c);
  297 
  298         q->count--;
  299         if (q->count == 0) {
  300             // call re-add directly since we're already in the worker thread.
  301             conn_worker_readd(c);
  302         }
  303     }
  304 }
  305 
  306 // called from the worker thread as an mc_resp is being freed.
  307 // must let go of the coroutine reference if there is one.
  308 // caller frees the pending IO.
  309 void proxy_finalize_cb(io_pending_t *pending) {
  310     io_pending_proxy_t *p = (io_pending_proxy_t *)pending;
  311 
  312     // release our coroutine reference.
  313     // TODO (v2): coroutines are reusable in lua 5.4. we can stack this onto a freelist
  314     // after a lua_resetthread(Lc) call.
  315     if (p->coro_ref) {
  316         // Note: lua registry is the same for main thread or a coroutine.
  317         luaL_unref(p->coro, LUA_REGISTRYINDEX, p->coro_ref);
  318     }
  319     return;
  320 }
  321 
  322 int try_read_command_proxy(conn *c) {
  323     char *el, *cont;
  324 
  325     if (c->rbytes == 0)
  326         return 0;
  327 
  328     el = memchr(c->rcurr, '\n', c->rbytes);
  329     if (!el) {
  330         if (c->rbytes > 1024) {
  331             /*
  332              * We didn't have a '\n' in the first k. This _has_ to be a
  333              * large multiget, if not we should just nuke the connection.
  334              */
  335             char *ptr = c->rcurr;
  336             while (*ptr == ' ') { /* ignore leading whitespaces */
  337                 ++ptr;
  338             }
  339 
  340             if (ptr - c->rcurr > 100 ||
  341                 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
  342 
  343                 conn_set_state(c, conn_closing);
  344                 return 1;
  345             }
  346 
  347             // ASCII multigets are unbound, so our fixed size rbuf may not
  348             // work for this particular workload... For backcompat we'll use a
  349             // malloc/realloc/free routine just for this.
  350             if (!c->rbuf_malloced) {
  351                 if (!rbuf_switch_to_malloc(c)) {
  352                     conn_set_state(c, conn_closing);
  353                     return 1;
  354                 }
  355             }
  356         }
  357 
  358         return 0;
  359     }
  360     cont = el + 1;
  361 
  362     assert(cont <= (c->rcurr + c->rbytes));
  363 
  364     c->last_cmd_time = current_time;
  365     proxy_process_command(c, c->rcurr, cont - c->rcurr, PROCESS_NORMAL);
  366 
  367     c->rbytes -= (cont - c->rcurr);
  368     c->rcurr = cont;
  369 
  370     assert(c->rcurr <= (c->rbuf + c->rsize));
  371 
  372     return 1;
  373 
  374 }
  375 
  376 // Called when a connection is closed while in nread state reading a set
  377 // Must only be called with an active coroutine.
  378 void proxy_cleanup_conn(conn *c) {
  379     assert(c->proxy_coro_ref != 0);
  380     LIBEVENT_THREAD *thr = c->thread;
  381     lua_State *L = thr->L;
  382     luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref);
  383     c->proxy_coro_ref = 0;
  384     WSTAT_DECR(c, proxy_req_active, 1);
  385 }
  386 
  387 // we buffered a SET of some kind.
  388 void complete_nread_proxy(conn *c) {
  389     assert(c != NULL);
  390 
  391     LIBEVENT_THREAD *thr = c->thread;
  392     lua_State *L = thr->L;
  393 
  394     if (c->proxy_coro_ref == 0) {
  395         complete_nread_ascii(c);
  396         return;
  397     }
  398 
  399     conn_set_state(c, conn_new_cmd);
  400 
  401     // Grab our coroutine.
  402     lua_rawgeti(L, LUA_REGISTRYINDEX, c->proxy_coro_ref);
  403     luaL_unref(L, LUA_REGISTRYINDEX, c->proxy_coro_ref);
  404     lua_State *Lc = lua_tothread(L, -1);
  405     mcp_request_t *rq = luaL_checkudata(Lc, -1, "mcp.request");
  406 
  407     // validate the data chunk.
  408     if (strncmp((char *)c->item + rq->pr.vlen - 2, "\r\n", 2) != 0) {
  409         lua_settop(L, 0); // clear anything remaining on the main thread.
  410         // FIXME (v2): need to set noreply false if mset_res, but that's kind
  411         // of a weird hack to begin with. Evaluate how to best do that here.
  412         out_string(c, "CLIENT_ERROR bad data chunk");
  413         return;
  414     }
  415 
  416     // We move ownership of the c->item buffer from the connection to the
  417     // request object here. Else we can double free if the conn closes while
  418     // inside nread.
  419     rq->pr.vbuf = c->item;
  420     c->item = NULL;
  421     c->item_malloced = false;
  422     c->proxy_coro_ref = 0;
  423 
  424     proxy_run_coroutine(Lc, c->resp, NULL, c);
  425 
  426     lua_settop(L, 0); // clear anything remaining on the main thread.
  427 
  428     return;
  429 }
  430 
  431 // Simple error wrapper for common failures.
  432 // lua_error() is a jump so this function never returns
  433 // for clarity add a 'return' after calls to this.
  434 void proxy_lua_error(lua_State *L, const char *s) {
  435     lua_pushstring(L, s);
  436     lua_error(L);
  437 }
  438 
  439 void proxy_lua_ferror(lua_State *L, const char *fmt, ...) {
  440     va_list ap;
  441     va_start(ap, fmt);
  442     lua_pushfstring(L, fmt, ap);
  443     va_end(ap);
  444     lua_error(L);
  445 }
  446 
  447 // Need a custom function so we can prefix lua strings easily.
  448 static void proxy_out_errstring(mc_resp *resp, const char *str) {
  449     size_t len;
  450     const static char error_prefix[] = "SERVER_ERROR ";
  451     const static int error_prefix_len = sizeof(error_prefix) - 1;
  452 
  453     assert(resp != NULL);
  454 
  455     resp_reset(resp);
  456     // avoid noreply since we're throwing important errors.
  457 
  458     // Fill response object with static string.
  459     len = strlen(str);
  460     if ((len + error_prefix_len + 2) > WRITE_BUFFER_SIZE) {
  461         /* ought to be always enough. just fail for simplicity */
  462         str = "SERVER_ERROR output line too long";
  463         len = strlen(str);
  464     }
  465 
  466     char *w = resp->wbuf;
  467     memcpy(w, error_prefix, error_prefix_len);
  468     w += error_prefix_len;
  469 
  470     memcpy(w, str, len);
  471     w += len;
  472 
  473     memcpy(w, "\r\n", 2);
  474     resp_add_iov(resp, resp->wbuf, len + error_prefix_len + 2);
  475     return;
  476 }
  477 
  478 // NOTE: See notes in mcp_queue_io; the secondary problem with setting the
  479 // noreply mode from the response object is that the proxy can return strings
  480 // manually, so we have no way to obey what the original request wanted in
  481 // that case.
  482 static void _set_noreply_mode(mc_resp *resp, mcp_resp_t *r) {
  483     switch (r->mode) {
  484         case RESP_MODE_NORMAL:
  485             break;
  486         case RESP_MODE_NOREPLY:
  487             // ascii noreply only threw egregious errors to client
  488             if (r->status == MCMC_OK) {
  489                 resp->skip = true;
  490             }
  491             break;
  492         case RESP_MODE_METAQUIET:
  493             if (r->resp.code == MCMC_CODE_MISS) {
  494                 resp->skip = true;
  495             } else if (r->cmd[1] != 'g' && r->resp.code == MCMC_CODE_OK) {
  496                 // FIXME (v2): mcmc's parser needs to help us out a bit more
  497                 // here.
  498                 // This is a broken case in the protocol though; quiet mode
  499                 // ignores HD for mutations but not get.
  500                 resp->skip = true;
  501             }
  502             break;
  503         default:
  504             assert(1 == 0);
  505     }
  506 }
  507 
  508 // this resumes every yielded coroutine (and re-resumes if necessary).
  509 // called from the worker thread after responses have been pulled from the
  510 // network.
  511 // Flow:
  512 // - the response object should already be on the coroutine stack.
  513 // - fix up the stack.
  514 // - run coroutine.
  515 // - if LUA_YIELD, we need to swap out the pending IO from its mc_resp then call for a queue
  516 // again.
  517 // - if LUA_OK finalize the response and return
  518 // - else set error into mc_resp.
  519 int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c) {
  520     int nresults = 0;
  521     int cores = lua_resume(Lc, NULL, 1, &nresults);
  522     size_t rlen = 0;
  523 
  524     if (cores == LUA_OK) {
  525         WSTAT_DECR(c, proxy_req_active, 1);
  526         int type = lua_type(Lc, -1);
  527         if (type == LUA_TUSERDATA) {
  528             mcp_resp_t *r = luaL_checkudata(Lc, -1, "mcp.response");
  529             LOGGER_LOG(NULL, LOG_PROXYCMDS, LOGGER_PROXY_RAW, NULL, r->start, r->cmd, r->resp.type, r->resp.code);
  530             _set_noreply_mode(resp, r);
  531             if (r->buf) {
  532                 // response set from C.
  533                 // FIXME (v2): write_and_free() ? it's a bit wrong for here.
  534                 resp->write_and_free = r->buf;
  535                 resp_add_iov(resp, r->buf, r->blen);
  536                 r->buf = NULL;
  537             } else if (lua_getiuservalue(Lc, -1, 1) != LUA_TNIL) {
  538                 // uservalue slot 1 is pre-created, so we get TNIL instead of
  539                 // TNONE when nothing was set into it.
  540                 const char *s = lua_tolstring(Lc, -1, &rlen);
  541                 size_t l = rlen > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : rlen;
  542                 memcpy(resp->wbuf, s, l);
  543                 resp_add_iov(resp, resp->wbuf, l);
  544                 lua_pop(Lc, 1);
  545             } else if (r->status != MCMC_OK) {
  546                 proxy_out_errstring(resp, "backend failure");
  547             } else {
  548                 // Empty response: used for ascii multiget emulation.
  549             }
  550         } else if (type == LUA_TSTRING) {
  551             // response is a raw string from lua.
  552             const char *s = lua_tolstring(Lc, -1, &rlen);
  553             size_t l = rlen > WRITE_BUFFER_SIZE ? WRITE_BUFFER_SIZE : rlen;
  554             memcpy(resp->wbuf, s, l);
  555             resp_add_iov(resp, resp->wbuf, l);
  556             lua_pop(Lc, 1);
  557         } else {
  558             proxy_out_errstring(resp, "bad response");
  559         }
  560     } else if (cores == LUA_YIELD) {
  561         if (nresults == 1) {
  562             // TODO (v2): try harder to validate; but we have so few yield cases
  563             // that I'm going to shortcut this here. A single yielded result
  564             // means it's probably an await(), so attempt to process this.
  565             if (p != NULL) {
  566                 int coro_ref = p->coro_ref;
  567                 mc_resp *resp = p->resp;
  568                 assert((void *)p == (void *)resp->io_pending);
  569                 resp->io_pending = NULL;
  570                 c = p->c;
  571                 do_cache_free(c->thread->io_cache, p);
  572                 mcplib_await_run(c, resp, Lc, coro_ref);
  573             } else {
  574                 // coroutine object sitting on the _main_ VM right now, so we grab
  575                 // the reference from there, which also pops it.
  576                 int coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
  577                 mcplib_await_run(c, c->resp, Lc, coro_ref);
  578             }
  579         } else {
  580             // need to remove and free the io_pending, since c->resp owns it.
  581             // so we call mcp_queue_io() again and let it override the
  582             // mc_resp's io_pending object.
  583 
  584             int coro_ref = 0;
  585             mc_resp *resp;
  586             if (p != NULL) {
  587                 coro_ref = p->coro_ref;
  588                 resp = p->resp;
  589                 c = p->c;
  590                 do_cache_free(p->c->thread->io_cache, p);
  591                 // *p is now dead.
  592             } else {
  593                 // yielding from a top level call to the coroutine,
  594                 // so we need to grab a reference to the coroutine thread.
  595                 // TODO (v2): make this more explicit?
  596                 // we only need to get the reference here, and error conditions
  597                 // should instead drop it, but now it's not obvious to users that
  598                 // we're reaching back into the main thread's stack.
  599                 assert(c != NULL);
  600                 coro_ref = luaL_ref(c->thread->L, LUA_REGISTRYINDEX);
  601                 resp = c->resp;
  602             }
  603             // TODO (v2): c only used for cache alloc? push the above into the func?
  604             mcp_queue_io(c, resp, coro_ref, Lc);
  605         }
  606     } else {
  607         WSTAT_DECR(c, proxy_req_active, 1);
  608         P_DEBUG("%s: Failed to run coroutine: %s\n", __func__, lua_tostring(Lc, -1));
  609         LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(Lc, -1));
  610         proxy_out_errstring(resp, "lua failure");
  611     }
  612 
  613     return 0;
  614 }
  615 
  616 static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget) {
  617     assert(c != NULL);
  618     LIBEVENT_THREAD *thr = c->thread;
  619     struct proxy_hook *hooks = thr->proxy_hooks;
  620     lua_State *L = thr->L;
  621     mcp_parser_t pr = {0};
  622 
  623     // Avoid doing resp_start() here, instead do it a bit later or as-needed.
  624     // This allows us to hop over to the internal text protocol parser, which
  625     // also calls resp_start().
  626     // Tighter integration later should obviate the need for this, it is not a
  627     // permanent solution.
  628     int ret = process_request(&pr, command, cmdlen);
  629     if (ret != 0) {
  630         WSTAT_INCR(c, proxy_conn_errors, 1);
  631         if (!resp_start(c)) {
  632             conn_set_state(c, conn_closing);
  633             return;
  634         }
  635         proxy_out_errstring(c->resp, "parsing request");
  636         if (ret == -2) {
  637             // Kill connection on more critical parse failure.
  638             conn_set_state(c, conn_closing);
  639         }
  640         return;
  641     }
  642 
  643     struct proxy_hook *hook = &hooks[pr.command];
  644 
  645     if (!hook->is_lua) {
  646         // need to pass our command string into the internal handler.
  647         // to minimize the code change, this means allowing it to tokenize the
  648         // full command. The proxy's indirect parser should be built out to
  649         // become common code for both proxy and ascii handlers.
  650         // For now this means we have to null-terminate the command string,
  651         // then call into text protocol handler.
  652         // FIXME (v2): use a ptr or something; don't like this code.
  653         if (cmdlen > 1 && command[cmdlen-2] == '\r') {
  654             command[cmdlen-2] = '\0';
  655         } else {
  656             command[cmdlen-1] = '\0';
  657         }
  658         // lets nread_proxy know we're in ascii mode.
  659         c->proxy_coro_ref = 0;
  660         process_command_ascii(c, command);
  661         return;
  662     }
  663 
  664     // If ascii multiget, we turn this into a self-calling loop :(
  665     // create new request with next key, call this func again, then advance
  666     // original string.
  667     // might be better to split this function; the below bits turn into a
  668     // function call, then we don't re-process the above bits in the same way?
  669     // The way this is detected/passed on is very fragile.
  670     if (!multiget && pr.cmd_type == CMD_TYPE_GET && pr.has_space) {
  671         uint32_t keyoff = pr.tokens[pr.keytoken];
  672         while (pr.klen != 0) {
  673             char temp[KEY_MAX_LENGTH + 30];
  674             char *cur = temp;
  675             // Core daemon can abort the entire command if one key is bad, but
  676             // we cannot from the proxy. Instead we have to inject errors into
  677             // the stream. This should, thankfully, be rare at least.
  678             if (pr.klen > KEY_MAX_LENGTH) {
  679                 if (!resp_start(c)) {
  680                     conn_set_state(c, conn_closing);
  681                     return;
  682                 }
  683                 proxy_out_errstring(c->resp, "key too long");
  684             } else {
  685                 // copy original request up until the original key token.
  686                 memcpy(cur, pr.request, pr.tokens[pr.keytoken]);
  687                 cur += pr.tokens[pr.keytoken];
  688 
  689                 // now copy in our "current" key.
  690                 memcpy(cur, &pr.request[keyoff], pr.klen);
  691                 cur += pr.klen;
  692 
  693                 memcpy(cur, "\r\n", 2);
  694                 cur += 2;
  695 
  696                 *cur = '\0';
  697                 P_DEBUG("%s: new multiget sub request: %s [%u/%u]\n", __func__, temp, keyoff, pr.klen);
  698                 proxy_process_command(c, temp, cur - temp, PROCESS_MULTIGET);
  699             }
  700 
  701             // now advance to the next key.
  702             keyoff = _process_request_next_key(&pr);
  703         }
  704 
  705         if (!resp_start(c)) {
  706             conn_set_state(c, conn_closing);
  707             return;
  708         }
  709 
  710         // The above recursions should have created c->resp's in dispatch
  711         // order.
  712         // So now we add another one at the end to create the capping END
  713         // string.
  714         memcpy(c->resp->wbuf, ENDSTR, ENDLEN);
  715         resp_add_iov(c->resp, c->resp->wbuf, ENDLEN);
  716 
  717         return;
  718     }
  719 
  720     // We test the command length all the way down here because multigets can
  721     // be very long, and they're chopped up by now.
  722     if (cmdlen >= MCP_REQUEST_MAXLEN) {
  723         WSTAT_INCR(c, proxy_conn_errors, 1);
  724         if (!resp_start(c)) {
  725             conn_set_state(c, conn_closing);
  726             return;
  727         }
  728         proxy_out_errstring(c->resp, "request too long");
  729         conn_set_state(c, conn_closing);
  730         return;
  731     }
  732 
  733     if (!resp_start(c)) {
  734         conn_set_state(c, conn_closing);
  735         return;
  736     }
  737 
  738     // Count requests handled by proxy vs local.
  739     // Also batch the counts down this far so we can lock once for the active
  740     // counter instead of twice.
  741     struct proxy_int_stats *istats = c->thread->proxy_int_stats;
  742     WSTAT_L(c->thread);
  743     istats->counters[pr.command]++;
  744     c->thread->stats.proxy_conn_requests++;
  745     c->thread->stats.proxy_req_active++;
  746     WSTAT_UL(c->thread);
  747 
  748     // start a coroutine.
  749     // TODO (v2): This can pull a thread from a cache.
  750     lua_newthread(L);
  751     lua_State *Lc = lua_tothread(L, -1);
  752     // leave the thread first on the stack, so we can reference it if needed.
  753     // pull the lua hook function onto the stack.
  754     lua_rawgeti(Lc, LUA_REGISTRYINDEX, hook->lua_ref);
  755 
  756     mcp_request_t *rq = mcp_new_request(Lc, &pr, command, cmdlen);
  757     if (multiget) {
  758         rq->ascii_multiget = true;
  759     }
  760     // TODO (v2): lift this to a post-processor?
  761     if (rq->pr.vlen != 0) {
  762         // relying on temporary malloc's not succumbing as poorly to
  763         // fragmentation.
  764         c->item = malloc(rq->pr.vlen);
  765         if (c->item == NULL) {
  766             lua_settop(L, 0);
  767             proxy_out_errstring(c->resp, "out of memory");
  768             WSTAT_DECR(c, proxy_req_active, 1);
  769             return;
  770         }
  771         c->item_malloced = true;
  772         c->ritem = c->item;
  773         c->rlbytes = rq->pr.vlen;
  774         c->proxy_coro_ref = luaL_ref(L, LUA_REGISTRYINDEX); // pops coroutine.
  775 
  776         conn_set_state(c, conn_nread);
  777         return;
  778     }
  779 
  780     proxy_run_coroutine(Lc, c->resp, NULL, c);
  781 
  782     lua_settop(L, 0); // clear anything remaining on the main thread.
  783 }
  784 
  785 // analogue for storage_get_item(); add a deferred IO object to the current
  786 // connection's response object. stack enough information to write to the
  787 // server on the submit callback, and enough to resume the lua state on the
  788 // completion callback.
  789 static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc) {
  790     io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_PROXY);
  791 
  792     // stack: request, hash selector. latter just to hold a reference.
  793 
  794     mcp_request_t *rq = luaL_checkudata(Lc, -1, "mcp.request");
  795     mcp_backend_t *be = rq->be;
  796 
  797     // Then we push a response object, which we'll re-use later.
  798     // reserve one uservalue for a lua-supplied response.
  799     mcp_resp_t *r = lua_newuserdatauv(Lc, sizeof(mcp_resp_t), 1);
  800     // FIXME (v2): is this memset still necessary? I was using it for
  801     // debugging.
  802     memset(r, 0, sizeof(mcp_resp_t));
  803     r->buf = NULL;
  804     r->blen = 0;
  805     r->start = rq->start; // need to inherit the original start time.
  806     // Set noreply mode.
  807     // TODO (v2): the response "inherits" the request's noreply mode, which isn't
  808     // strictly correct; we should inherit based on the request that spawned
  809     // the coroutine but the structure doesn't allow that yet.
  810     // Should also be able to settle this exact mode from the parser so we
  811     // don't have to re-branch here.
  812     if (rq->pr.noreply) {
  813         if (rq->pr.cmd_type == CMD_TYPE_META) {
  814             r->mode = RESP_MODE_METAQUIET;
  815             for (int x = 2; x < rq->pr.ntokens; x++) {
  816                 if (rq->request[rq->pr.tokens[x]] == 'q') {
  817                     rq->request[rq->pr.tokens[x]] = ' ';
  818                 }
  819             }
  820         } else {
  821             r->mode = RESP_MODE_NOREPLY;
  822             rq->request[rq->pr.reqlen - 3] = 'Y';
  823         }
  824     } else {
  825         r->mode = RESP_MODE_NORMAL;
  826     }
  827 
  828     int x;
  829     int end = rq->pr.reqlen-2 > RESP_CMD_MAX ? RESP_CMD_MAX : rq->pr.reqlen-2;
  830     for (x = 0; x < end; x++) {
  831         if (rq->pr.request[x] == ' ') {
  832             break;
  833         }
  834         r->cmd[x] = rq->pr.request[x];
  835     }
  836     r->cmd[x] = '\0';
  837 
  838     luaL_getmetatable(Lc, "mcp.response");
  839     lua_setmetatable(Lc, -2);
  840 
  841     io_pending_proxy_t *p = do_cache_alloc(c->thread->io_cache);
  842     if (p == NULL) {
  843         WSTAT_INCR(c, proxy_conn_oom, 1);
  844         proxy_lua_error(Lc, "out of memory allocating from IO cache");
  845         return;
  846     }
  847 
  848     // this is a re-cast structure, so assert that we never outsize it.
  849     assert(sizeof(io_pending_t) >= sizeof(io_pending_proxy_t));
  850     memset(p, 0, sizeof(io_pending_proxy_t));
  851     // set up back references.
  852     p->io_queue_type = IO_QUEUE_PROXY;
  853     p->thread = c->thread;
  854     p->c = c;
  855     p->resp = resp;
  856     p->client_resp = r;
  857     p->flushed = false;
  858     p->ascii_multiget = rq->ascii_multiget;
  859     resp->io_pending = (io_pending_t *)p;
  860 
  861     // top of the main thread should be our coroutine.
  862     // lets grab a reference to it and pop so it doesn't get gc'ed.
  863     p->coro_ref = coro_ref;
  864 
  865     // we'll drop the pointer to the coro on here to save some CPU
  866     // on re-fetching it later. The pointer shouldn't change.
  867     p->coro = Lc;
  868 
  869     // The direct backend object. Lc is holding the reference in the stack
  870     p->backend = be;
  871 
  872     mcp_request_attach(Lc, rq, p);
  873 
  874     // link into the batch chain.
  875     p->next = q->stack_ctx;
  876     q->stack_ctx = p;
  877 
  878     return;
  879 }
  880 
  881 // Common lua debug command.
  882 __attribute__((unused)) void dump_stack(lua_State *L) {
  883     int top = lua_gettop(L);
  884     int i = 1;
  885     fprintf(stderr, "--TOP OF STACK [%d]\n", top);
  886     for (; i < top + 1; i++) {
  887         int type = lua_type(L, i);
  888         // lets find the metatable of this userdata to identify it.
  889         if (lua_getmetatable(L, i) != 0) {
  890             lua_pushstring(L, "__name");
  891             if (lua_rawget(L, -2) != LUA_TNIL) {
  892                 fprintf(stderr, "--|%d| [%s] (%s)\n", i, lua_typename(L, type), lua_tostring(L, -1));
  893                 lua_pop(L, 2);
  894                 continue;
  895             }
  896             lua_pop(L, 2);
  897         }
  898         if (type == LUA_TSTRING) {
  899             fprintf(stderr, "--|%d| [%s] | %s\n", i, lua_typename(L, type), lua_tostring(L, i));
  900         } else {
  901             fprintf(stderr, "--|%d| [%s]\n", i, lua_typename(L, type));
  902         }
  903     }
  904     fprintf(stderr, "-----------------\n");
  905 }
  906 
  907