"Fossies" - the Fresh Open Source Software Archive

Member "memcached-1.6.15/vendor/mcmc/mcmc.c" (30 Mar 2022, 22953 Bytes) of package /linux/www/memcached-1.6.15.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "mcmc.c": 1.6.14_vs_1.6.15.

    1 #include <stdlib.h>
    2 #include <sys/types.h>
    3 #include <sys/socket.h>
    4 #include <sys/uio.h>
    5 #include <netdb.h>
    6 #include <unistd.h>
    7 #include <fcntl.h>
    8 #include <string.h>
    9 #include <stdint.h>
   10 #include <errno.h>
   11 #include <stdio.h>
   12 
   13 #include "mcmc.h"
   14 
   15 // TODO: if there's a parse error or unknown status code, we likely have a
   16 // protocol desync and need to disconnect.
   17 
   18 // NOTE: this _will_ change a bit for adding TLS support.
   19 
   20 // A "reasonable" minimum buffer size to work with.
   21 // Callers are allowed to create a buffer of any size larger than this.
   22 // TODO: Put the math/documentation in here.
   23 // This is essentially the largest return value status line possible.
   24 // at least doubled for wiggle room.
   25 #define MIN_BUFFER_SIZE 2048
   26 
   27 #define FLAG_BUF_IS_ERROR 0x1
   28 #define FLAG_BUF_IS_NUMERIC 0x2
   29 #define FLAG_BUF_WANTED_READ 0x4
   30 
   31 #define STATE_DEFAULT 0 // looking for any kind of response
   32 #define STATE_GET_RESP 1 // processing VALUE's until END
   33 #define STATE_STAT_RESP 2 // processing STAT's until END
   34 #define STATE_STAT_RESP_DONE 3
   35 
   36 typedef struct mcmc_ctx {
   37     int fd;
   38     int gai_status; // getaddrinfo() last status.
   39     int last_sys_error; // last syscall error (connect/etc?)
   40     int sent_bytes_partial; // note for partially sent buffers.
   41     int request_queue; // supposed outstanding replies.
   42     int fail_code; // recent failure reason.
   43     int error; // latest error code.
   44     uint32_t status_flags; // internal only flags.
   45     int state;
   46 
   47     // FIXME: s/buffer_used/buffer_filled/ ?
   48     size_t buffer_used; // amount of bytes read into the buffer so far.
   49     size_t buffer_request_len; // cached endpoint for current request
   50     char *buffer_head; // buffer pointer currently in use.
   51     char *buffer_tail; // consumed tail of the buffer.
   52 
   53     // request response detail.
   54     mcmc_resp_t *resp;
   55 } mcmc_ctx_t;
   56 
   57 // INTERNAL FUNCTIONS
   58 
   59 static int _mcmc_parse_value_line(mcmc_ctx_t *ctx) {
   60     char *buf = ctx->buffer_head;
   61     // we know that "VALUE " has matched, so skip that.
   62     char *p = buf+6;
   63     size_t l = ctx->buffer_request_len;
   64 
   65     // <key> <flags> <bytes> [<cas unique>]
   66     char *key = p;
   67     int keylen;
   68     p = memchr(p, ' ', l - 6);
   69     if (p == NULL) {
   70         // FIXME: these should return MCMC_ERR and set the internal parse
   71         // error code.
   72         return MCMC_PARSE_ERROR;
   73     }
   74 
   75     keylen = p - key;
   76 
   77     // convert flags into something useful.
   78     // FIXME: do we need to prevent overruns in strtoul?
   79     // we know for sure the line will eventually end in a \n.
   80     char *n = NULL;
   81     errno = 0;
   82     uint32_t flags = strtoul(p, &n, 10);
   83     if ((errno == ERANGE) || (p == n) || (*n != ' ')) {
   84         return MCMC_PARSE_ERROR;
   85     }
   86     p = n;
   87 
   88     errno = 0;
   89     uint32_t bytes = strtoul(p, &n, 10);
   90     if ((errno == ERANGE) || (p == n)) {
   91         return MCMC_PARSE_ERROR;
   92     }
   93     p = n;
   94 
   95     // If next byte is a space, we read the optional CAS value.
   96     uint64_t cas = 0;
   97     if (*n == ' ') {
   98         errno = 0;
   99         cas = strtoull(p, &n, 10);
  100         if ((errno == ERANGE) || (p == n)) {
  101             return MCMC_PARSE_ERROR;
  102         }
  103     }
  104 
  105     // If we made it this far, we've parsed everything, stuff the details into
  106     // the context for fetching later.
  107     mcmc_resp_t *r = ctx->resp;
  108     // FIXME: set to NULL if we don't have the value?
  109     r->value = ctx->buffer_tail;
  110     r->vlen = bytes + 2; // add in the \r\n
  111     int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
  112     if (buffer_remain >= r->vlen) {
  113         r->vlen_read = r->vlen;
  114         ctx->buffer_tail += r->vlen;
  115     } else {
  116         r->vlen_read = buffer_remain;
  117     }
  118     r->key = key;
  119     r->klen = keylen;
  120     r->flags = flags;
  121     r->cas = cas;
  122     r->type = MCMC_RESP_GET;
  123     ctx->state = STATE_GET_RESP;
  124 
  125     // NOTE: if value_offset < buffer_used, has part of the value in the
  126     // buffer already.
  127 
  128     return MCMC_OK;
  129 }
  130 
  131 // FIXME: This is broken for ASCII multiget.
  132 // if we get VALUE back, we need to stay in ASCII GET read mode until an END
  133 // is seen.
  134 static int _mcmc_parse_response(mcmc_ctx_t *ctx) {
  135     char *buf = ctx->buffer_head;
  136     char *cur = buf;
  137     size_t l = ctx->buffer_request_len;
  138     int rlen; // response code length.
  139     int more = 0;
  140     mcmc_resp_t *r = ctx->resp;
  141     r->reslen = ctx->buffer_request_len;
  142     r->type = MCMC_RESP_GENERIC;
  143 
  144     // walk until the \r\n
  145     while (l-- > 2) {
  146         if (*cur == ' ') {
  147             more = 1;
  148             break;
  149         }
  150         cur++;
  151     }
  152     rlen = cur - buf;
  153 
  154     // incr/decr returns a number with no code :(
  155     // not checking length first since buf must have at least one char to
  156     // enter this function.
  157     if (buf[0] >= '0' && buf[0] <= '9') {
  158         // TODO: parse it as a number on request.
  159         // TODO: validate whole thing as digits here?
  160         ctx->status_flags |= FLAG_BUF_IS_NUMERIC;
  161         r->type = MCMC_RESP_NUMERIC;
  162         return MCMC_OK;
  163     }
  164 
  165     if (rlen < 2) {
  166         ctx->error = MCMC_PARSE_ERROR_SHORT;
  167         return MCMC_ERR;
  168     }
  169 
  170     int rv = MCMC_OK;
  171     int code = MCMC_CODE_OK;
  172     switch (rlen) {
  173         case 2:
  174             // meta, "OK"
  175             // FIXME: adding new return codes would make the client completely
  176             // fail. The rest of the client is agnostic to requests/flags for
  177             // meta.
  178             // can we make it agnostic for return codes outside of "read this
  179             // data" types?
  180             // As-is it should fail down to the "send the return code to the
  181             // user". not sure that's right.
  182             r->type = MCMC_RESP_META;
  183             switch (buf[0]) {
  184             case 'E':
  185                 if (buf[1] == 'N') {
  186                     code = MCMC_CODE_MISS;
  187                     // TODO: RESP type
  188                 } else if (buf[1] == 'X') {
  189                     code = MCMC_CODE_EXISTS;
  190                 }
  191                 break;
  192             case 'H':
  193                 if (buf[1] == 'D') {
  194                     // typical meta response.
  195                     code = MCMC_CODE_OK;
  196                 }
  197                 break;
  198             case 'M':
  199                 if (buf[1] == 'N') {
  200                     // specific return code so user can see pipeline end.
  201                     code = MCMC_CODE_NOP;
  202                 } else if (buf[1] == 'E') {
  203                     // ME is the debug output line.
  204                     // TODO: this just gets returned as an rline?
  205                     // specific code? specific type?
  206                     // ME <key> <key=value debug line>
  207                     rv = MCMC_OK;
  208                 }
  209                 break;
  210             case 'N':
  211                 if (buf[1] == 'F') {
  212                     code = MCMC_CODE_NOT_FOUND;
  213                 } else if (buf[1] == 'S') {
  214                     code = MCMC_CODE_NOT_STORED;
  215                 }
  216                 break;
  217             case 'O':
  218                 if (buf[1] == 'K') {
  219                     // Used by many random management commands
  220                     r->type = MCMC_RESP_GENERIC;
  221                 }
  222                 break;
  223             case 'V':
  224                 if (buf[1] == 'A') {
  225                     // VA <size> <flags>*\r\n
  226                     if (more) {
  227                         errno = 0;
  228                         char *n = NULL;
  229                         uint32_t vsize = strtoul(cur, &n, 10);
  230                         if ((errno == ERANGE) || (cur == n)) {
  231                             rv = MCMC_ERR;
  232                         } else {
  233                             r->value = ctx->buffer_tail;
  234                             r->vlen = vsize + 2; // tag in the \r\n.
  235                             // FIXME: macro.
  236                             int buffer_remain = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
  237                             if (buffer_remain >= r->vlen) {
  238                                 r->vlen_read = r->vlen;
  239                                 ctx->buffer_tail += r->vlen;
  240                             } else {
  241                                 r->vlen_read = buffer_remain;
  242                             }
  243                             cur = n;
  244                             if (*cur != ' ') {
  245                                 more = 0;
  246                             }
  247                         }
  248                     } else {
  249                         rv = MCMC_ERR;
  250                     }
  251                 }
  252                 break;
  253             }
  254             // maybe: if !rv and !fail, do something special?
  255             // if (more), there are flags. shove them in the right place.
  256             if (more) {
  257                 r->rline = cur+1; // eat the space.
  258                 r->rlen = l-1;
  259             } else {
  260                 r->rline = NULL;
  261                 r->rlen = 0;
  262             }
  263             break;
  264         case 3:
  265             if (memcmp(buf, "END", 3) == 0) {
  266                 // Either end of STAT results, or end of ascii GET key list.
  267                 ctx->state = STATE_DEFAULT;
  268                 // FIXME: caller needs to understand if this is a real miss.
  269                 code = MCMC_CODE_MISS;
  270                 r->type = MCMC_RESP_END;
  271                 rv = MCMC_OK;
  272             }
  273             break;
  274         case 4:
  275             if (memcmp(buf, "STAT", 4) == 0) {
  276                 r->type = MCMC_RESP_STAT;
  277                 ctx->state = STATE_STAT_RESP;
  278                 // TODO: initialize stat reader mode.
  279             }
  280             break;
  281         case 5:
  282             if (memcmp(buf, "VALUE", 5) == 0) {
  283                 if (more) {
  284                     // <key> <flags> <bytes> [<cas unique>]
  285                     rv = _mcmc_parse_value_line(ctx);
  286                 } else {
  287                     rv = MCMC_ERR; // FIXME: parse error.
  288                 }
  289             }
  290             break;
  291         case 6:
  292             if (memcmp(buf, "STORED", 6) == 0) {
  293                 code = MCMC_CODE_STORED;
  294             } else if (memcmp(buf, "EXISTS", 6) == 0) {
  295                 code = MCMC_CODE_EXISTS;
  296                 // TODO: type -> ASCII?
  297             }
  298             break;
  299         case 7:
  300             if (memcmp(buf, "DELETED", 7) == 0) {
  301                 code = MCMC_CODE_DELETED;
  302             } else if (memcmp(buf, "TOUCHED", 7) == 0) {
  303                 code = MCMC_CODE_TOUCHED;
  304             } else if (memcmp(buf, "VERSION", 7) == 0) {
  305                 code = MCMC_CODE_VERSION;
  306                 r->type = MCMC_RESP_VERSION;
  307                 // TODO: prep the version line for return
  308             }
  309             break;
  310         case 9:
  311             if (memcmp(buf, "NOT_FOUND", 9) == 0) {
  312                 code = MCMC_CODE_NOT_FOUND;
  313             }
  314             break;
  315         case 10:
  316             if (memcmp(buf, "NOT_STORED", 10) == 0) {
  317                 code = MCMC_CODE_NOT_STORED;
  318             }
  319             break;
  320         default:
  321             // Unknown code, assume error.
  322             break;
  323     }
  324 
  325     r->code = code;
  326     if (rv == -1) {
  327         // TODO: Finish this.
  328         ctx->status_flags |= FLAG_BUF_IS_ERROR;
  329         rv = MCMC_ERR;
  330     }
  331 
  332     return rv;
  333 }
  334 
  335 // EXTERNAL API
  336 
  337 int mcmc_fd(void *c) {
  338     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  339     return ctx->fd;
  340 }
  341 
  342 size_t mcmc_size(int options) {
  343     return sizeof(mcmc_ctx_t);
  344 }
  345 
  346 // Allow returning this dynamically based on options set.
  347 // FIXME: it might be more flexible to call this after mcmc_connect()...
  348 // but this is probably more convenient for the caller if it's less dynamic.
  349 size_t mcmc_min_buffer_size(int options) {
  350     return MIN_BUFFER_SIZE;
  351 }
  352 
  353 char *mcmc_read_prep(void *c, char *buf, size_t bufsize, size_t *bufremain) {
  354     mcmc_ctx_t *ctx = c;
  355     char *b = buf + ctx->buffer_used;
  356     *bufremain = bufsize - ctx->buffer_used;
  357     return b;
  358 }
  359 
  360 // Directly parse a buffer with read data of size len.
  361 // r->reslen + r->vlen_read is the bytes consumed from the buffer read.
  362 // Caller manages how to retry if MCMC_WANT_READ or an error happens.
  363 // FIXME: not sure if to keep this command to a fixed buffer size, or continue
  364 // to use the ctx->buffer_used bits... if we keep the buffer_used stuff caller can
  365 // loop without memmove'ing the buffer?
  366 int mcmc_parse_buf(void *c, char *buf, size_t read, mcmc_resp_t *r) {
  367     mcmc_ctx_t *ctx = c;
  368     char *el;
  369     ctx->buffer_used += read;
  370 
  371     el = memchr(buf, '\n', ctx->buffer_used);
  372     if (el == NULL) {
  373         return MCMC_WANT_READ;
  374     }
  375 
  376     memset(r, 0, sizeof(*r));
  377 
  378     // Consume through the newline.
  379     // buffer_tail now points to where value could start.
  380     // FIXME: ctx->value ?
  381     ctx->buffer_tail = el+1;
  382 
  383     // FIXME: the server must be stricter in what it sends back. should always
  384     // have a \r. check for it and fail?
  385     ctx->buffer_request_len = ctx->buffer_tail - buf;
  386     // leave the \r\n in the line end cache.
  387     ctx->buffer_head = buf;
  388     // TODO: handling for nonblock case.
  389 
  390     // We have a result line. Now pass it through the parser.
  391     // Then we indicate to the user that a response is ready.
  392     ctx->resp = r;
  393     return _mcmc_parse_response(ctx);
  394 }
  395 
  396 /*** Functions wrapping syscalls **/
  397 
  398 // TODO: should be able to flip between block and nonblock.
  399 
  400 // used for checking on async connections.
  401 int mcmc_check_nonblock_connect(void *c, int *err) {
  402     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  403     socklen_t errsize = sizeof(*err);
  404     if (getsockopt(ctx->fd, SOL_SOCKET, SO_ERROR, err, &errsize) == 0) {
  405         if (*err == 0) {
  406             return MCMC_OK;
  407         }
  408     } else {
  409         // getsockopt failed. still need to pass up the error.
  410         *err = errno;
  411     }
  412 
  413     return MCMC_ERR;
  414 }
  415 
  416 // TODO:
  417 // - option for connecting 4 -> 6 or 6 -> 4
  418 // connect_unix()
  419 // connect_bind_tcp()
  420 // ^ fill an internal struct from the stack and call into this central
  421 // connect?
  422 int mcmc_connect(void *c, char *host, char *port, int options) {
  423     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  424 
  425     int s;
  426     int sock;
  427     int res = MCMC_CONNECTED;
  428     struct addrinfo hints;
  429     struct addrinfo *ai;
  430     struct addrinfo *next;
  431 
  432     // Since our cx memory was likely malloc'ed, ensure we start clear.
  433     memset(ctx, 0, sizeof(mcmc_ctx_t));
  434     memset(&hints, 0, sizeof(hints));
  435     hints.ai_family = AF_INET;
  436     hints.ai_socktype = SOCK_STREAM;
  437 
  438     s = getaddrinfo(host, port, &hints, &ai);
  439 
  440     if (s != 0) {
  441         hints.ai_family = AF_INET6;
  442         s = getaddrinfo(host, port, &hints, &ai);
  443         if (s != 0) {
  444             // TODO: gai_strerror(s)
  445             ctx->gai_status = s;
  446             res = MCMC_ERR;
  447             goto end;
  448         }
  449     }
  450 
  451     for (next = ai; next != NULL; next = next->ai_next) {
  452         sock = socket(next->ai_family, next->ai_socktype,
  453                 next->ai_protocol);
  454         if (sock == -1)
  455             continue;
  456 
  457         if (options & MCMC_OPTION_TCP_KEEPALIVE) {
  458             int optval = 1;
  459             if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) {
  460                 res = MCMC_ERR;
  461                 close(sock);
  462                 goto end;
  463             }
  464         }
  465 
  466         if (options & MCMC_OPTION_NONBLOCK) {
  467             int flags = fcntl(sock, F_GETFL);
  468             if (flags < 0) {
  469                 res = MCMC_ERR;
  470                 close(sock);
  471                 goto end;
  472             }
  473             if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
  474                 res = MCMC_ERR;
  475                 close(sock);
  476                 goto end;
  477             }
  478             res = MCMC_CONNECTING;
  479 
  480             if (connect(sock, next->ai_addr, next->ai_addrlen) != -1) {
  481                 if (errno == EINPROGRESS) {
  482                     break; // We're good, stop the loop.
  483                 }
  484             }
  485 
  486             break;
  487         } else {
  488             // TODO: BIND local port.
  489             if (connect(sock, next->ai_addr, next->ai_addrlen) != -1)
  490                 break;
  491         }
  492 
  493         close(sock);
  494     }
  495 
  496     // TODO: cache last connect status code?
  497     if (next == NULL) {
  498         res = MCMC_ERR;
  499         goto end;
  500     }
  501 
  502     ctx->fd = sock;
  503 end:
  504     if (ai) {
  505         freeaddrinfo(ai);
  506     }
  507     return res;
  508 }
  509 
  510 // NOTE: if WANT_WRITE returned, call with same arguments.
  511 // FIXME: len -> size_t?
  512 // TODO: rename to mcmc_request_send
  513 int mcmc_send_request(void *c, const char *request, int len, int count) {
  514     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  515 
  516     // adjust our send buffer by how much has already been sent.
  517     const char *r = request + ctx->sent_bytes_partial;
  518     int l = len - ctx->sent_bytes_partial;
  519     int sent = send(ctx->fd, r, l, 0);
  520     if (sent == -1) {
  521         // implicitly handle nonblock mode.
  522         if (errno == EAGAIN || errno == EWOULDBLOCK) {
  523             return MCMC_WANT_WRITE;
  524         } else {
  525             return MCMC_ERR;
  526         }
  527     }
  528 
  529     if (sent < len) {
  530         // can happen anytime, but mostly in nonblocking mode.
  531         ctx->sent_bytes_partial += sent;
  532         return MCMC_WANT_WRITE;
  533     } else {
  534         ctx->request_queue += count;
  535         ctx->sent_bytes_partial = 0;
  536     }
  537 
  538     return MCMC_OK;
  539 }
  540 
  541 // TODO: pretty sure I don't want this function chewing on a submitted iov
  542 // stack, but it might make for less client code :(
  543 // so for now, lets not.
  544 int mcmc_request_writev(void *c, const struct iovec *iov, int iovcnt, ssize_t *sent, int count) {
  545     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  546     // need to track sent vs tosend to know when to update counters.
  547     ssize_t tosend = 0;
  548     for (int i = 0; i < iovcnt; i++) {
  549         tosend += iov[i].iov_len;
  550     }
  551 
  552     *sent = writev(ctx->fd, iov, iovcnt);
  553     if (*sent == -1) {
  554         // implicitly handle nonblock mode.
  555         if (errno == EAGAIN || errno == EWOULDBLOCK) {
  556             return MCMC_WANT_WRITE;
  557         } else {
  558             return MCMC_ERR;
  559         }
  560     }
  561 
  562     if (*sent < tosend) {
  563         // can happen anytime, but mostly in nonblocking mode.
  564         return MCMC_WANT_WRITE;
  565     } else {
  566         // FIXME: user has to keep submitting the same count value...
  567         // should decide on whether or not to give up on this.
  568         ctx->request_queue += count;
  569     }
  570 
  571     return MCMC_OK;
  572 }
  573 
  574 // TODO: avoid recv if we have bytes in the buffer.
  575 int mcmc_read(void *c, char *buf, size_t bufsize, mcmc_resp_t *r) {
  576     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  577     char *el;
  578     memset(r, 0, sizeof(*r));
  579 
  580     // If there's still data in the buffer try to use it before potentially
  581     // hanging on the network read.
  582     // Also skip this check if we specifically wanted more bytes from net.
  583     if (ctx->buffer_used && !(ctx->status_flags & FLAG_BUF_WANTED_READ)) {
  584         el = memchr(buf, '\n', ctx->buffer_used);
  585         if (el) {
  586             goto parse;
  587         }
  588     }
  589 
  590     // adjust buffer by how far we've already consumed.
  591     char *b = buf + ctx->buffer_used;
  592     size_t l = bufsize - ctx->buffer_used;
  593 
  594     int read = recv(ctx->fd, b, l, 0);
  595     if (read == 0) {
  596         return MCMC_NOT_CONNECTED;
  597     } else if (read == -1) {
  598         // implicitly handle nonblocking configurations.
  599         if (errno == EAGAIN || errno == EWOULDBLOCK) {
  600             return MCMC_WANT_READ;
  601         } else {
  602             return MCMC_ERR;
  603         }
  604     }
  605 
  606     ctx->buffer_used += read;
  607 
  608     // Always scan from the start of the original buffer.
  609     el = memchr(buf, '\n', ctx->buffer_used);
  610     if (!el) {
  611         // FIXME: error if buffer is full but no \n is found.
  612         ctx->status_flags |= FLAG_BUF_WANTED_READ;
  613         return MCMC_WANT_READ;
  614     }
  615 parse:
  616     // Consume through the newline.
  617     // buffer_tail now points to where a value could start.
  618     ctx->buffer_tail = el+1;
  619 
  620     // FIXME: the server must be stricter in what it sends back. should always
  621     // have a \r. check for it and fail?
  622     ctx->buffer_request_len = ctx->buffer_tail - buf;
  623     // leave the \r\n in the line end cache.
  624     ctx->buffer_head = buf;
  625     // TODO: handling for nonblock case.
  626 
  627     // We have a result line. Now pass it through the parser.
  628     // Then we indicate to the user that a response is ready.
  629     ctx->resp = r;
  630     return _mcmc_parse_response(ctx);
  631 }
  632 
  633 void mcmc_get_error(void *c, char *code, size_t clen, char *msg, size_t mlen) {
  634     code[0] = '\0';
  635     msg[0] = '\0';
  636 }
  637 
  638 int mcmc_read_value_buf(void *c, char *val, const size_t vsize, int *read) {
  639     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  640 
  641     // If the distance between tail/head is smaller than what we read into the
  642     // main buffer, we have some value to copy out.
  643     int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
  644     if (leftover > 0) {
  645         int tocopy = leftover > vsize ? vsize : leftover;
  646         memcpy(val + *read, ctx->buffer_tail, tocopy);
  647         ctx->buffer_tail += tocopy;
  648         *read += tocopy;
  649         if (leftover > tocopy) {
  650             // FIXME: think we need a specific code for "value didn't fit"
  651             return MCMC_WANT_READ;
  652         }
  653     }
  654 
  655     return MCMC_OK;
  656 }
  657 
  658 // read into the buffer, up to a max size of vsize.
  659 // will read (vsize-read) into the buffer pointed to by (val+read).
  660 // you are able to stream the value into different buffers, or process the
  661 // value and reuse the same buffer, by adjusting vsize and *read between
  662 // calls.
  663 // vsize must not be larger than the remaining value size pending read.
  664 int mcmc_read_value(void *c, char *val, const size_t vsize, int *read) {
  665     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  666     size_t l;
  667 
  668     // If the distance between tail/head is smaller than what we read into the
  669     // main buffer, we have some value to copy out.
  670     int leftover = ctx->buffer_used - (ctx->buffer_tail - ctx->buffer_head);
  671     if (leftover > 0) {
  672         int tocopy = leftover > vsize ? vsize : leftover;
  673         memcpy(val + *read, ctx->buffer_tail, tocopy);
  674         ctx->buffer_tail += tocopy;
  675         *read += tocopy;
  676         if (leftover > tocopy) {
  677             // FIXME: think we need a specific code for "value didn't fit"
  678             return MCMC_WANT_READ;
  679         }
  680     }
  681 
  682     char *v = val + *read;
  683     l = vsize - *read;
  684 
  685     int r = recv(ctx->fd, v, l, 0);
  686     if (r == 0) {
  687         // TODO: some internal disconnect work?
  688         return MCMC_NOT_CONNECTED;
  689     }
  690     // FIXME: EAGAIN || EWOULDBLOCK!
  691     if (r == -1) {
  692         return MCMC_ERR;
  693     }
  694 
  695     *read += r;
  696 
  697     if (*read < vsize) {
  698         return MCMC_WANT_READ;
  699     } else {
  700         return MCMC_OK;
  701     }
  702 }
  703 
  704 char *mcmc_buffer_consume(void *c, int *remain) {
  705     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  706     ctx->buffer_used -= ctx->buffer_tail - ctx->buffer_head;
  707     int used = ctx->buffer_used;
  708     char *newbuf = ctx->buffer_tail;
  709 
  710     // FIXME: request_queue-- is in the wrong place.
  711     // TODO: which of these _must_ be reset between requests? I think very
  712     // little?
  713     ctx->request_queue--;
  714     ctx->status_flags = 0;
  715     ctx->buffer_head = NULL;
  716     ctx->buffer_tail = NULL;
  717 
  718     if (used) {
  719         *remain = used;
  720         return newbuf;
  721     } else {
  722         return NULL;
  723     }
  724 }
  725 
  726 int mcmc_disconnect(void *c) {
  727     mcmc_ctx_t *ctx = (mcmc_ctx_t *)c;
  728 
  729     // FIXME: I forget if 0 can be valid.
  730     if (ctx->fd != 0) {
  731         close(ctx->fd);
  732         return MCMC_OK;
  733     } else {
  734         return MCMC_NOT_CONNECTED;
  735     }
  736 }