"Fossies" - the Fresh Open Source Software Archive

Member "haproxy-2.0.0/contrib/spoa_example/spoa.c" (16 Jun 2019, 49152 Bytes) of package /linux/misc/haproxy-2.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 /*
    2  * A Random IP reputation service acting as a Stream Processing Offload Agent
    3  *
    4  * This is a very simple service that implement a "random" ip reputation
    5  * service. It will return random scores for all checked IP addresses. It only
    6  * shows you how to implement a ip reputation service or such kind of services
    7  * using the SPOE.
    8  *
    9  * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
   10  *
   11  * This program is free software; you can redistribute it and/or
   12  * modify it under the terms of the GNU General Public License
   13  * as published by the Free Software Foundation; either version
   14  * 2 of the License, or (at your option) any later version.
   15  *
   16  */
   17 #include <unistd.h>
   18 #include <stdlib.h>
   19 #include <string.h>
   20 #include <stdbool.h>
   21 #include <errno.h>
   22 #include <stdio.h>
   23 #include <signal.h>
   24 #include <arpa/inet.h>
   25 #include <netinet/in.h>
   26 #include <netinet/tcp.h>
   27 #include <sys/socket.h>
   28 #include <err.h>
   29 #include <ctype.h>
   30 
   31 #include <pthread.h>
   32 
   33 #include <event2/util.h>
   34 #include <event2/event.h>
   35 #include <event2/event_struct.h>
   36 #include <event2/thread.h>
   37 
   38 #include <mini-clist.h>
   39 #include <spoe_types.h>
   40 #include <spop_functions.h>
   41 
   42 #define DEFAULT_PORT       12345
   43 #define CONNECTION_BACKLOG 10
   44 #define NUM_WORKERS        10
   45 #define MAX_FRAME_SIZE     16384
   46 #define SPOP_VERSION       "2.0"
   47 
   48 #define SLEN(str) (sizeof(str)-1)
   49 
   50 #define LOG(worker, fmt, args...)                                       \
   51     do {                                \
   52         struct timeval  now;                    \
   53                                                                         \
   54         gettimeofday(&now, NULL);               \
   55         fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n",       \
   56             now.tv_sec, now.tv_usec, (worker)->id, ##args); \
   57     } while (0)
   58 
   59 #define DEBUG(x...)             \
   60     do {                    \
   61         if (debug)          \
   62             LOG(x);         \
   63     } while (0)
   64 
   65 
   66 enum spoa_state {
   67     SPOA_ST_CONNECTING = 0,
   68     SPOA_ST_PROCESSING,
   69     SPOA_ST_DISCONNECTING,
   70 };
   71 
   72 enum spoa_frame_type {
   73     SPOA_FRM_T_UNKNOWN = 0,
   74     SPOA_FRM_T_HAPROXY,
   75     SPOA_FRM_T_AGENT,
   76 };
   77 
   78 struct spoe_engine {
   79     char       *id;
   80 
   81     struct list processing_frames;
   82     struct list outgoing_frames;
   83 
   84     struct list clients;
   85     struct list list;
   86 };
   87 
   88 struct spoe_frame {
   89     enum spoa_frame_type type;
   90     char                *buf;
   91     unsigned int         offset;
   92     unsigned int         len;
   93 
   94     unsigned int         stream_id;
   95     unsigned int         frame_id;
   96     unsigned int         flags;
   97     bool                 hcheck;     /* true is the CONNECT frame is a healthcheck */
   98     bool                 fragmented; /* true if the frame is fragmented */
   99     int                  ip_score;   /* -1 if unset, else between 0 and 100 */
  100 
  101     struct event         process_frame_event;
  102     struct worker       *worker;
  103     struct spoe_engine  *engine;
  104     struct client       *client;
  105     struct list          list;
  106 
  107     char                *frag_buf; /* used to accumulate payload of a fragmented frame */
  108     unsigned int         frag_len;
  109 
  110     char                 data[0];
  111 };
  112 
  113 struct client {
  114     int                 fd;
  115     unsigned long       id;
  116     enum spoa_state     state;
  117 
  118     struct event        read_frame_event;
  119     struct event        write_frame_event;
  120 
  121     struct spoe_frame  *incoming_frame;
  122     struct spoe_frame  *outgoing_frame;
  123 
  124     struct list         processing_frames;
  125     struct list         outgoing_frames;
  126 
  127     unsigned int        max_frame_size;
  128     int                 status_code;
  129 
  130     char               *engine_id;
  131     struct spoe_engine *engine;
  132     bool                pipelining;
  133     bool                async;
  134     bool                fragmentation;
  135 
  136     struct worker      *worker;
  137     struct list         by_worker;
  138     struct list         by_engine;
  139 };
  140 
  141 struct worker {
  142     pthread_t           thread;
  143     int                 id;
  144     struct event_base  *base;
  145     struct event       *monitor_event;
  146 
  147     struct list         engines;
  148 
  149     unsigned int        nbclients;
  150     struct list         clients;
  151 
  152     struct list         frames;
  153     unsigned int        nbframes;
  154 };
  155 
  156 
  157 /* Globals */
  158 static struct worker *workers          = NULL;
  159 static struct worker  null_worker      = { .id = 0 };
  160 static unsigned long  clicount         = 0;
  161 static int            server_port      = DEFAULT_PORT;
  162 static int            num_workers      = NUM_WORKERS;
  163 static unsigned int   max_frame_size   = MAX_FRAME_SIZE;
  164 struct timeval        processing_delay = {0, 0};
  165 static bool           debug            = false;
  166 static bool           pipelining       = false;
  167 static bool           async            = false;
  168 static bool           fragmentation    = false;
  169 
  170 
  171 static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
  172     [SPOE_FRM_ERR_NONE]               = "normal",
  173     [SPOE_FRM_ERR_IO]                 = "I/O error",
  174     [SPOE_FRM_ERR_TOUT]               = "a timeout occurred",
  175     [SPOE_FRM_ERR_TOO_BIG]            = "frame is too big",
  176     [SPOE_FRM_ERR_INVALID]            = "invalid frame received",
  177     [SPOE_FRM_ERR_NO_VSN]             = "version value not found",
  178     [SPOE_FRM_ERR_NO_FRAME_SIZE]      = "max-frame-size value not found",
  179     [SPOE_FRM_ERR_NO_CAP]             = "capabilities value not found",
  180     [SPOE_FRM_ERR_BAD_VSN]            = "unsupported version",
  181     [SPOE_FRM_ERR_BAD_FRAME_SIZE]     = "max-frame-size too big or too small",
  182     [SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported",
  183     [SPOE_FRM_ERR_INTERLACED_FRAMES]  = "invalid interlaced frames",
  184     [SPOE_FRM_ERR_FRAMEID_NOTFOUND]   = "frame-id not found",
  185     [SPOE_FRM_ERR_RES]                = "resource allocation error",
  186     [SPOE_FRM_ERR_UNKNOWN]            = "an unknown error occurred",
  187 };
  188 
  189 static void signal_cb(evutil_socket_t, short, void *);
  190 static void accept_cb(evutil_socket_t, short, void *);
  191 static void worker_monitor_cb(evutil_socket_t, short, void *);
  192 static void process_frame_cb(evutil_socket_t, short, void *);
  193 static void read_frame_cb(evutil_socket_t, short, void *);
  194 static void write_frame_cb(evutil_socket_t, short, void *);
  195 
  196 static void use_spoe_engine(struct client *);
  197 static void unuse_spoe_engine(struct client *);
  198 static void release_frame(struct spoe_frame *);
  199 static void release_client(struct client *);
  200 
  201 static void
  202 check_ipv4_reputation(struct spoe_frame *frame, struct in_addr *ipv4)
  203 {
  204     char str[INET_ADDRSTRLEN];
  205 
  206     if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
  207         return;
  208 
  209     frame->ip_score = random() % 100;
  210 
  211     DEBUG(frame->worker, "IP score for %.*s is %d",
  212           INET_ADDRSTRLEN, str, frame->ip_score);
  213 }
  214 
  215 static void
  216 check_ipv6_reputation(struct spoe_frame *frame, struct in6_addr *ipv6)
  217 {
  218     char str[INET6_ADDRSTRLEN];
  219 
  220     if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
  221         return;
  222 
  223     frame->ip_score = random() % 100;
  224 
  225     DEBUG(frame->worker, "IP score for %.*s is %d",
  226           INET6_ADDRSTRLEN, str, frame->ip_score);
  227 }
  228 
  229 
  230 /* Check the protocol version. It returns -1 if an error occurred, the number of
  231  * read bytes otherwise. */
  232 static int
  233 check_proto_version(struct spoe_frame *frame, char **buf, char *end)
  234 {
  235     char      *str, *p = *buf;
  236     uint64_t   sz;
  237     int        ret;
  238 
  239     /* Get the list of all supported versions by HAProxy */
  240     if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
  241         return -1;
  242     ret = spoe_decode_buffer(&p, end, &str, &sz);
  243     if (ret == -1 || !str)
  244         return -1;
  245 
  246     DEBUG(frame->worker, "<%lu> Supported versions : %.*s",
  247           frame->client->id, (int)sz, str);
  248 
  249     /* TODO: Find the right verion in supported ones */
  250 
  251     ret  = (p - *buf);
  252     *buf = p;
  253     return ret;
  254 }
  255 
  256 /* Check max frame size value. It returns -1 if an error occurred, the number of
  257  * read bytes otherwise. */
  258 static int
  259 check_max_frame_size(struct spoe_frame *frame, char **buf, char *end)
  260 {
  261     char    *p = *buf;
  262     uint64_t sz;
  263     int      type, ret;
  264 
  265     /* Get the max-frame-size value of HAProxy */
  266     type =  *p++;
  267     if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32  &&
  268         (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64  &&
  269         (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
  270         (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64)
  271         return -1;
  272     if (decode_varint(&p, end, &sz) == -1)
  273         return -1;
  274 
  275     /* Keep the lower value */
  276     if (sz < frame->client->max_frame_size)
  277         frame->client->max_frame_size = sz;
  278 
  279     DEBUG(frame->worker, "<%lu> HAProxy maximum frame size : %u",
  280           frame->client->id, (unsigned int)sz);
  281 
  282     ret  = (p - *buf);
  283     *buf = p;
  284     return ret;
  285 }
  286 
  287 /* Check healthcheck value. It returns -1 if an error occurred, the number of
  288  * read bytes otherwise. */
  289 static int
  290 check_healthcheck(struct spoe_frame *frame, char **buf, char *end)
  291 {
  292     char *p = *buf;
  293     int   type, ret;
  294 
  295     /* Get the "healthcheck" value */
  296     type = *p++;
  297     if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL)
  298         return -1;
  299     frame->hcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
  300 
  301     DEBUG(frame->worker, "<%lu> HELLO healthcheck : %s",
  302           frame->client->id, (frame->hcheck ? "true" : "false"));
  303 
  304     ret  = (p - *buf);
  305     *buf = p;
  306     return ret;
  307 }
  308 
  309 /* Check capabilities value. It returns -1 if an error occurred, the number of
  310  * read bytes otherwise. */
  311 static int
  312 check_capabilities(struct spoe_frame *frame, char **buf, char *end)
  313 {
  314     struct client *client = frame->client;
  315     char          *str, *p = *buf;
  316     uint64_t       sz;
  317     int            ret;
  318 
  319     if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
  320         return -1;
  321     if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
  322         return -1;
  323     if (str == NULL) /* this is not an error */
  324         goto end;
  325 
  326     DEBUG(frame->worker, "<%lu> HAProxy capabilities : %.*s",
  327           client->id, (int)sz, str);
  328 
  329     while (sz) {
  330         char *delim;
  331 
  332         /* Skip leading spaces */
  333         for (; isspace(*str) && sz; sz--);
  334 
  335         if (sz >= 10 && !strncmp(str, "pipelining", 10)) {
  336             str += 10; sz -= 10;
  337             if (!sz || isspace(*str) || *str == ',') {
  338                 DEBUG(frame->worker,
  339                       "<%lu> HAProxy supports frame pipelining",
  340                       client->id);
  341                 client->pipelining = true;
  342             }
  343         }
  344         else if (sz >= 5 && !strncmp(str, "async", 5)) {
  345             str += 5; sz -= 5;
  346             if (!sz || isspace(*str) || *str == ',') {
  347                 DEBUG(frame->worker,
  348                       "<%lu> HAProxy supports asynchronous frame",
  349                       client->id);
  350                 client->async = true;
  351             }
  352         }
  353         else if (sz >= 13 && !strncmp(str, "fragmentation", 13)) {
  354             str += 13; sz -= 13;
  355             if (!sz || isspace(*str) || *str == ',') {
  356                 DEBUG(frame->worker,
  357                       "<%lu> HAProxy supports fragmented frame",
  358                       client->id);
  359                 client->fragmentation = true;
  360             }
  361         }
  362 
  363         if (!sz || (delim = memchr(str, ',', sz)) == NULL)
  364             break;
  365         delim++;
  366         sz -= (delim - str);
  367         str = delim;
  368     }
  369   end:
  370     ret  = (p - *buf);
  371     *buf = p;
  372     return ret;
  373 }
  374 
  375 /* Check engine-id value. It returns -1 if an error occurred, the number of
  376  * read bytes otherwise. */
  377 static int
  378 check_engine_id(struct spoe_frame *frame, char **buf, char *end)
  379 {
  380     struct client *client = frame->client;
  381     char          *str, *p = *buf;
  382     uint64_t       sz;
  383     int            ret;
  384 
  385     if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
  386         return -1;
  387 
  388     if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
  389         return -1;
  390     if (str == NULL) /* this is not an error */
  391         goto end;
  392 
  393     if (client->engine != NULL)
  394         goto end;
  395 
  396     DEBUG(frame->worker, "<%lu> HAProxy engine id : %.*s",
  397           client->id, (int)sz, str);
  398 
  399     client->engine_id = strndup(str, (int)sz);
  400   end:
  401     ret  = (p - *buf);
  402     *buf = p;
  403     return ret;
  404 }
  405 
  406 static int
  407 acc_payload(struct spoe_frame *frame)
  408 {
  409     struct client *client = frame->client;
  410     char          *buf;
  411     size_t         len = frame->len - frame->offset;
  412     int            ret = frame->offset;
  413 
  414     /* No need to accumulation payload */
  415     if (frame->fragmented == false)
  416         return ret;
  417 
  418     buf = realloc(frame->frag_buf, frame->frag_len + len);
  419     if (buf == NULL) {
  420         client->status_code = SPOE_FRM_ERR_RES;
  421         return -1;
  422     }
  423     memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
  424     frame->frag_buf  = buf;
  425     frame->frag_len += len;
  426 
  427     if (!(frame->flags & SPOE_FRM_FL_FIN)) {
  428         /* Wait for next parts */
  429         frame->buf    = (char *)(frame->data);
  430         frame->offset = 0;
  431         frame->len    = 0;
  432         frame->flags  = 0;
  433         return 1;
  434     }
  435 
  436     frame->buf    = frame->frag_buf;
  437     frame->len    = frame->frag_len;
  438     frame->offset = 0;
  439     return ret;
  440 }
  441 
  442 /* Check disconnect status code. It returns -1 if an error occurred, the number
  443  * of read bytes otherwise. */
  444 static int
  445 check_discon_status_code(struct spoe_frame *frame, char **buf, char *end)
  446 {
  447     char    *p = *buf;
  448     uint64_t sz;
  449     int      type, ret;
  450 
  451     /* Get the "status-code" value */
  452     type =  *p++;
  453     if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
  454         (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
  455         (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
  456         (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64)
  457         return -1;
  458     if (decode_varint(&p, end, &sz) == -1)
  459         return -1;
  460 
  461     frame->client->status_code = (unsigned int)sz;
  462 
  463     DEBUG(frame->worker, "<%lu> Disconnect status code : %u",
  464           frame->client->id, frame->client->status_code);
  465 
  466     ret  = (p - *buf);
  467     *buf = p;
  468     return ret;
  469 }
  470 
  471 /* Check the disconnect message. It returns -1 if an error occurred, the number
  472  * of read bytes otherwise. */
  473 static int
  474 check_discon_message(struct spoe_frame *frame, char **buf, char *end)
  475 {
  476     char    *str, *p = *buf;
  477     uint64_t sz;
  478     int      ret;
  479 
  480     /* Get the "message" value */
  481     if ((*p++ & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR)
  482         return -1;
  483     ret = spoe_decode_buffer(&p, end, &str, &sz);
  484     if (ret == -1 || !str)
  485         return -1;
  486 
  487     DEBUG(frame->worker, "<%lu> Disconnect message : %.*s",
  488           frame->client->id, (int)sz, str);
  489 
  490     ret  = (p - *buf);
  491     *buf = p;
  492     return ret;
  493 }
  494 
  495 
  496 
  497 /* Decode a HELLO frame received from HAProxy. It returns -1 if an error
  498  * occurred, otherwise the number of read bytes. HELLO frame cannot be
  499  * ignored and having another frame than a HELLO frame is an error. */
  500 static int
  501 handle_hahello(struct spoe_frame *frame)
  502 {
  503     struct client *client = frame->client;
  504     char          *p, *end;
  505 
  506     p = frame->buf;
  507     end = frame->buf + frame->len;
  508 
  509     /* Check frame type: we really want a HELLO frame */
  510     if (*p++ != SPOE_FRM_T_HAPROXY_HELLO)
  511         goto error;
  512 
  513     DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id);
  514 
  515     /* Retrieve flags */
  516     memcpy((char *)&(frame->flags), p, 4);
  517     frame->flags = ntohl(frame->flags);
  518     p += 4;
  519 
  520     /* Fragmentation is not supported for HELLO frame */
  521     if (!(frame->flags & SPOE_FRM_FL_FIN)) {
  522         client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
  523         goto error;
  524     }
  525 
  526     /* stream-id and frame-id must be cleared */
  527     if (*p != 0 || *(p+1) != 0) {
  528         client->status_code = SPOE_FRM_ERR_INVALID;
  529         goto error;
  530     }
  531     p += 2;
  532 
  533     /* Loop on K/V items */
  534     while (p < end) {
  535         char     *str;
  536         uint64_t  sz;
  537 
  538         /* Decode the item name */
  539         spoe_decode_buffer(&p, end, &str, &sz);
  540         if (!str) {
  541             client->status_code = SPOE_FRM_ERR_INVALID;
  542             goto error;
  543         }
  544 
  545         /* Check "supported-versions" K/V item */
  546         if (!memcmp(str, "supported-versions", sz)) {
  547             if (check_proto_version(frame, &p, end)  == -1) {
  548                 client->status_code = SPOE_FRM_ERR_INVALID;
  549                 goto error;
  550             }
  551         }
  552         /* Check "max-frame-size" K/V item */
  553         else if (!memcmp(str, "max-frame-size", sz)) {
  554             if (check_max_frame_size(frame, &p, end) == -1) {
  555                 client->status_code = SPOE_FRM_ERR_INVALID;
  556                 goto error;
  557             }
  558         }
  559         /* Check "healthcheck" K/V item */
  560         else if (!memcmp(str, "healthcheck", sz)) {
  561             if (check_healthcheck(frame, &p, end) == -1) {
  562                 client->status_code = SPOE_FRM_ERR_INVALID;
  563                 goto error;
  564             }
  565         }
  566         /* Check "capabilities" K/V item */
  567         else if (!memcmp(str, "capabilities", sz)) {
  568             if (check_capabilities(frame, &p, end) == -1) {
  569                 client->status_code = SPOE_FRM_ERR_INVALID;
  570                 goto error;
  571             }
  572         }
  573         /* Check "engine-id" K/V item */
  574         else if (!memcmp(str, "engine-id", sz)) {
  575             if (check_engine_id(frame, &p, end) == -1) {
  576                 client->status_code = SPOE_FRM_ERR_INVALID;
  577                 goto error;
  578             }
  579         }
  580         else {
  581             DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s",
  582                   client->id, (int)sz, str);
  583 
  584             /* Silently ignore unknown item */
  585             if (spoe_skip_data(&p, end) == -1) {
  586                 client->status_code = SPOE_FRM_ERR_INVALID;
  587                 goto error;
  588             }
  589         }
  590     }
  591 
  592     if (async == false || client->engine_id == NULL)
  593         client->async = false;
  594     if (pipelining == false)
  595         client->pipelining = false;
  596 
  597     if (client->async == true)
  598         use_spoe_engine(client);
  599 
  600     return (p - frame->buf);
  601   error:
  602     return -1;
  603 }
  604 
  605 /* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
  606  * occurred, otherwise the number of read bytes. DISCONNECT frame cannot be
  607  * ignored and having another frame than a DISCONNECT frame is an error.*/
  608 static int
  609 handle_hadiscon(struct spoe_frame *frame)
  610 {
  611     struct client *client = frame->client;
  612     char          *p, *end;
  613 
  614     p = frame->buf;
  615     end = frame->buf + frame->len;
  616 
  617     /* Check frame type: we really want a DISCONNECT frame */
  618     if (*p++ != SPOE_FRM_T_HAPROXY_DISCON)
  619         goto error;
  620 
  621     DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id);
  622 
  623     /* Retrieve flags */
  624     memcpy((char *)&(frame->flags), p, 4);
  625     frame->flags = ntohl(frame->flags);
  626     p += 4;
  627 
  628     /* Fragmentation is not supported for DISCONNECT frame */
  629     if (!(frame->flags & SPOE_FRM_FL_FIN)) {
  630         client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
  631         goto error;
  632     }
  633 
  634     /* stream-id and frame-id must be cleared */
  635     if (*p != 0 || *(p+1) != 0) {
  636         client->status_code = SPOE_FRM_ERR_INVALID;
  637         goto error;
  638     }
  639     p += 2;
  640 
  641     client->status_code = SPOE_FRM_ERR_NONE;
  642 
  643     /* Loop on K/V items */
  644     while (p < end) {
  645         char     *str;
  646         uint64_t  sz;
  647 
  648         /* Decode item key */
  649         spoe_decode_buffer(&p, end, &str, &sz);
  650         if (!str) {
  651             client->status_code = SPOE_FRM_ERR_INVALID;
  652             goto error;
  653         }
  654 
  655         /* Check "status-code" K/V item */
  656         if (!memcmp(str, "status-code", sz)) {
  657             if (check_discon_status_code(frame, &p, end) == -1) {
  658                 client->status_code = SPOE_FRM_ERR_INVALID;
  659                 goto error;
  660             }
  661         }
  662         /* Check "message" K/V item */
  663         else if (!memcmp(str, "message", sz)) {
  664             if (check_discon_message(frame, &p, end) == -1) {
  665                 client->status_code = SPOE_FRM_ERR_INVALID;
  666                 goto error;
  667             }
  668         }
  669         else {
  670             DEBUG(frame->worker, "<%lu> Skip K/V item : key=%.*s",
  671                   client->id, (int)sz, str);
  672 
  673             /* Silently ignore unknown item */
  674             if (spoe_skip_data(&p, end) == -1) {
  675                 client->status_code = SPOE_FRM_ERR_INVALID;
  676                 goto error;
  677             }
  678         }
  679     }
  680 
  681     return (p - frame->buf);
  682   error:
  683     return -1;
  684 }
  685 
  686 /* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
  687  * occurred, 0 if it must be must be ignored, otherwise the number of read
  688  * bytes. */
  689 static int
  690 handle_hanotify(struct spoe_frame *frame)
  691 {
  692     struct client *client = frame->client;
  693     char          *p, *end;
  694     uint64_t       stream_id, frame_id;
  695 
  696     p = frame->buf;
  697     end = frame->buf + frame->len;
  698 
  699     /* Check frame type */
  700     if (*p++ != SPOE_FRM_T_HAPROXY_NOTIFY)
  701         goto ignore;
  702 
  703     DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id);
  704 
  705     /* Retrieve flags */
  706     memcpy((char *)&(frame->flags), p, 4);
  707     frame->flags = ntohl(frame->flags);
  708     p += 4;
  709 
  710     /* Fragmentation is not supported */
  711     if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
  712         client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
  713         goto error;
  714     }
  715 
  716     /* Read the stream-id and frame-id */
  717     if (decode_varint(&p, end, &stream_id) == -1)
  718         goto ignore;
  719     if (decode_varint(&p, end, &frame_id) == -1)
  720         goto ignore;
  721 
  722     frame->stream_id = (unsigned int)stream_id;
  723     frame->frame_id  = (unsigned int)frame_id;
  724 
  725     DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
  726           " - %s frame received"
  727           " - frag_len=%u - len=%u - offset=%ld",
  728           client->id, frame->stream_id, frame->frame_id,
  729           (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
  730           frame->frag_len, frame->len, p - frame->buf);
  731 
  732     frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
  733     frame->offset = (p - frame->buf);
  734     return acc_payload(frame);
  735 
  736   ignore:
  737     return 0;
  738 
  739   error:
  740     return -1;
  741 }
  742 
  743 /* Decode next part of a fragmented frame received from HAProxy. It returns -1
  744  * if an error occurred, 0 if it must be must be ignored, otherwise the number
  745  * of read bytes. */
  746 static int
  747 handle_hafrag(struct spoe_frame *frame)
  748 {
  749     struct client *client = frame->client;
  750     char          *p, *end;
  751     uint64_t       stream_id, frame_id;
  752 
  753     p = frame->buf;
  754     end = frame->buf + frame->len;
  755 
  756     /* Check frame type */
  757     if (*p++ != SPOE_FRM_T_UNSET)
  758         goto ignore;
  759 
  760     DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id);
  761 
  762     /* Fragmentation is not supported */
  763     if (fragmentation == false) {
  764         client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
  765         goto error;
  766     }
  767 
  768     /* Retrieve flags */
  769     memcpy((char *)&(frame->flags), p, 4);
  770     frame->flags = ntohl(frame->flags);
  771     p+= 4;
  772 
  773     /* Read the stream-id and frame-id */
  774     if (decode_varint(&p, end, &stream_id) == -1)
  775         goto ignore;
  776     if (decode_varint(&p, end, &frame_id) == -1)
  777         goto ignore;
  778 
  779     if (frame->fragmented == false                  ||
  780         frame->stream_id != (unsigned int)stream_id ||
  781         frame->frame_id  != (unsigned int)frame_id) {
  782         client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
  783         goto error;
  784     }
  785 
  786     if (frame->flags & SPOE_FRM_FL_ABRT) {
  787         DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
  788               " - Abort processing of a fragmented frame"
  789               " - frag_len=%u - len=%u - offset=%ld",
  790               client->id, frame->stream_id, frame->frame_id,
  791               frame->frag_len, frame->len, p - frame->buf);
  792         goto ignore;
  793     }
  794 
  795     DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
  796           " - %s fragment of a fragmented frame received"
  797           " - frag_len=%u - len=%u - offset=%ld",
  798           client->id, frame->stream_id, frame->frame_id,
  799           (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
  800           frame->frag_len, frame->len, p - frame->buf);
  801 
  802     frame->offset = (p - frame->buf);
  803     return acc_payload(frame);
  804 
  805   ignore:
  806     return 0;
  807 
  808   error:
  809     return -1;
  810 }
  811 
  812 /* Encode a HELLO frame to send it to HAProxy. It returns the number of written
  813  * bytes. */
  814 static int
  815 prepare_agenthello(struct spoe_frame *frame)
  816 {
  817     struct client *client = frame->client;
  818     char          *p, *end;
  819     char           capabilities[64];
  820     int            n;
  821     unsigned int   flags  = SPOE_FRM_FL_FIN;
  822 
  823     DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id);
  824     frame->type = SPOA_FRM_T_AGENT;
  825 
  826     p   = frame->buf;
  827     end = frame->buf+max_frame_size;
  828 
  829     /* Frame Type */
  830     *p++ = SPOE_FRM_T_AGENT_HELLO;
  831 
  832     /* Set flags */
  833     flags = htonl(flags);
  834     memcpy(p, (char *)&flags, 4);
  835     p += 4;
  836 
  837     /* No stream-id and frame-id for HELLO frames */
  838     *p++ = 0;
  839     *p++ = 0;
  840 
  841     /* "version" K/V item */
  842     spoe_encode_buffer("version", 7, &p, end);
  843     *p++ = SPOE_DATA_T_STR;
  844     spoe_encode_buffer(SPOP_VERSION, SLEN(SPOP_VERSION), &p, end);
  845     DEBUG(frame->worker, "<%lu> Agent version : %s",
  846           client->id, SPOP_VERSION);
  847 
  848 
  849     /* "max-frame-size" K/V item */
  850     spoe_encode_buffer("max-frame-size", 14, &p ,end);
  851     *p++ = SPOE_DATA_T_UINT32;
  852     encode_varint(client->max_frame_size, &p, end);
  853     DEBUG(frame->worker, "<%lu> Agent maximum frame size : %u",
  854           client->id, client->max_frame_size);
  855 
  856     /* "capabilities" K/V item */
  857     spoe_encode_buffer("capabilities", 12, &p, end);
  858     *p++ = SPOE_DATA_T_STR;
  859 
  860     memset(capabilities, 0, sizeof(capabilities));
  861     n = 0;
  862 
  863     /*     1. Fragmentation capability ? */
  864     if (fragmentation == true) {
  865         memcpy(capabilities, "fragmentation", 13);
  866         n += 13;
  867     }
  868     /*     2. Pipelining capability ? */
  869     if (client->pipelining == true) {
  870         if (n) capabilities[n++] = ',';
  871         memcpy(capabilities + n, "pipelining", 10);
  872         n += 10;
  873     }
  874     /*     3. Async capability ? */
  875     if (client->async == true) {
  876         if (n) capabilities[n++] = ',';
  877         memcpy(capabilities + n, "async", 5);
  878         n += 5;
  879     }
  880     spoe_encode_buffer(capabilities, n, &p, end);
  881 
  882     DEBUG(frame->worker, "<%lu> Agent capabilities : %.*s",
  883           client->id, n, capabilities);
  884 
  885     frame->len = (p - frame->buf);
  886     return frame->len;
  887 }
  888 
  889 /* Encode a DISCONNECT frame to send it to HAProxy. It returns the number of
  890  * written bytes. */
  891 static int
  892 prepare_agentdicon(struct spoe_frame *frame)
  893 {
  894     struct client *client = frame->client;
  895     char           *p, *end;
  896     const char     *reason;
  897     int             rlen;
  898     unsigned int    flags  = SPOE_FRM_FL_FIN;
  899 
  900     DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id);
  901     frame->type = SPOA_FRM_T_AGENT;
  902 
  903     p   = frame->buf;
  904     end = frame->buf+max_frame_size;
  905 
  906     if (client->status_code >= SPOE_FRM_ERRS)
  907         client->status_code = SPOE_FRM_ERR_UNKNOWN;
  908     reason = spoe_frm_err_reasons[client->status_code];
  909     rlen   = strlen(reason);
  910 
  911     /* Frame type */
  912     *p++ = SPOE_FRM_T_AGENT_DISCON;
  913 
  914     /* Set flags */
  915     flags = htonl(flags);
  916     memcpy(p, (char *)&flags, 4);
  917     p += 4;
  918 
  919     /* No stream-id and frame-id for DISCONNECT frames */
  920     *p++ = 0;
  921     *p++ = 0;
  922 
  923     /* There are 2 mandatory items: "status-code" and "message" */
  924 
  925     /* "status-code" K/V item */
  926     spoe_encode_buffer("status-code", 11, &p, end);
  927     *p++ = SPOE_DATA_T_UINT32;
  928     encode_varint(client->status_code, &p, end);
  929     DEBUG(frame->worker, "<%lu> Disconnect status code : %u",
  930           client->id, client->status_code);
  931 
  932     /* "message" K/V item */
  933     spoe_encode_buffer("message", 7, &p, end);
  934     *p++ = SPOE_DATA_T_STR;
  935     spoe_encode_buffer(reason, rlen, &p, end);
  936     DEBUG(frame->worker, "<%lu> Disconnect message : %s",
  937           client->id, reason);
  938 
  939     frame->len = (p - frame->buf);
  940     return frame->len;
  941 }
  942 
  943 /* Encode a ACK frame to send it to HAProxy. It returns the number of written
  944  * bytes. */
  945 static int
  946 prepare_agentack(struct spoe_frame *frame)
  947 {
  948     char        *p, *end;
  949     unsigned int flags  = SPOE_FRM_FL_FIN;
  950 
  951     /* Be careful here, in async mode, frame->client can be NULL */
  952 
  953     DEBUG(frame->worker, "Encode Agent ACK frame");
  954     frame->type = SPOA_FRM_T_AGENT;
  955 
  956     p   = frame->buf;
  957     end = frame->buf+max_frame_size;
  958 
  959     /* Frame type */
  960     *p++ = SPOE_FRM_T_AGENT_ACK;
  961 
  962     /* Set flags */
  963     flags = htonl(flags);
  964     memcpy(p, (char *)&flags, 4);
  965     p += 4;
  966 
  967     /* Set stream-id and frame-id for ACK frames */
  968     encode_varint(frame->stream_id, &p, end);
  969     encode_varint(frame->frame_id, &p, end);
  970 
  971     DEBUG(frame->worker, "STREAM-ID=%u - FRAME-ID=%u",
  972           frame->stream_id, frame->frame_id);
  973 
  974     frame->len = (p - frame->buf);
  975     return frame->len;
  976 }
  977 
  978 static int
  979 create_server_socket(void)
  980 {
  981     struct sockaddr_in listen_addr;
  982     int                fd, yes = 1;
  983 
  984     fd = socket(AF_INET, SOCK_STREAM, 0);
  985     if (fd < 0) {
  986         LOG(&null_worker, "Failed to create service socket : %m");
  987         return -1;
  988     }
  989 
  990     memset(&listen_addr, 0, sizeof(listen_addr));
  991     listen_addr.sin_family = AF_INET;
  992     listen_addr.sin_addr.s_addr = INADDR_ANY;
  993     listen_addr.sin_port = htons(server_port);
  994 
  995     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0 ||
  996         setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) < 0) {
  997         LOG(&null_worker, "Failed to set option on server socket : %m");
  998         return -1;
  999     }
 1000 
 1001     if (bind(fd, (struct sockaddr *) &listen_addr, sizeof(listen_addr)) < 0) {
 1002         LOG(&null_worker, "Failed to bind server socket : %m");
 1003         return -1;
 1004     }
 1005 
 1006     if (listen(fd, CONNECTION_BACKLOG) < 0) {
 1007         LOG(&null_worker, "Failed to listen on server socket : %m");
 1008         return -1;
 1009     }
 1010 
 1011     return fd;
 1012 }
 1013 
 1014 static void
 1015 release_frame(struct spoe_frame *frame)
 1016 {
 1017     struct worker *worker;
 1018 
 1019     if (frame == NULL)
 1020         return;
 1021 
 1022     if (event_pending(&frame->process_frame_event, EV_TIMEOUT, NULL))
 1023         event_del(&frame->process_frame_event);
 1024 
 1025     worker = frame->worker;
 1026     LIST_DEL(&frame->list);
 1027     if (frame->frag_buf)
 1028         free(frame->frag_buf);
 1029     memset(frame, 0, sizeof(*frame)+max_frame_size+4);
 1030     LIST_ADDQ(&worker->frames, &frame->list);
 1031 }
 1032 
 1033 static void
 1034 release_client(struct client *c)
 1035 {
 1036     struct spoe_frame *frame, *back;
 1037 
 1038     if (c == NULL)
 1039         return;
 1040 
 1041     DEBUG(c->worker, "<%lu> Release client", c->id);
 1042 
 1043     LIST_DEL(&c->by_worker);
 1044     c->worker->nbclients--;
 1045 
 1046     unuse_spoe_engine(c);
 1047     free(c->engine_id);
 1048 
 1049     if (event_pending(&c->read_frame_event, EV_READ, NULL))
 1050         event_del(&c->read_frame_event);
 1051     if (event_pending(&c->write_frame_event, EV_WRITE, NULL))
 1052         event_del(&c->write_frame_event);
 1053 
 1054     release_frame(c->incoming_frame);
 1055     release_frame(c->outgoing_frame);
 1056     list_for_each_entry_safe(frame, back, &c->processing_frames, list) {
 1057         release_frame(frame);
 1058     }
 1059     list_for_each_entry_safe(frame, back, &c->outgoing_frames, list) {
 1060         release_frame(frame);
 1061     }
 1062 
 1063     if (c->fd >= 0)
 1064         close(c->fd);
 1065 
 1066     free(c);
 1067 }
 1068 
 1069 static void
 1070 reset_frame(struct spoe_frame *frame)
 1071 {
 1072     if (frame == NULL)
 1073         return;
 1074 
 1075     if (frame->frag_buf)
 1076         free(frame->frag_buf);
 1077 
 1078     frame->type       = SPOA_FRM_T_UNKNOWN;
 1079     frame->buf        = (char *)(frame->data);
 1080     frame->offset     = 0;
 1081     frame->len        = 0;
 1082     frame->stream_id  = 0;
 1083     frame->frame_id   = 0;
 1084     frame->flags      = 0;
 1085     frame->hcheck     = false;
 1086     frame->fragmented = false;
 1087     frame->ip_score   = -1;
 1088     frame->frag_buf   = NULL;
 1089     frame->frag_len   = 0;
 1090     LIST_INIT(&frame->list);
 1091 }
 1092 
 1093 static void
 1094 use_spoe_engine(struct client *client)
 1095 {
 1096     struct spoe_engine *eng;
 1097 
 1098     if (client->engine_id == NULL)
 1099         return;
 1100 
 1101     list_for_each_entry(eng, &client->worker->engines, list) {
 1102         if (!strcmp(eng->id, client->engine_id))
 1103             goto end;
 1104     }
 1105 
 1106     if ((eng = malloc(sizeof(*eng))) == NULL) {
 1107         client->async = false;
 1108         return;
 1109     }
 1110 
 1111     eng->id = strdup(client->engine_id);
 1112     LIST_INIT(&eng->clients);
 1113     LIST_INIT(&eng->processing_frames);
 1114     LIST_INIT(&eng->outgoing_frames);
 1115     LIST_ADDQ(&client->worker->engines, &eng->list);
 1116     LOG(client->worker, "Add new SPOE engine '%s'", eng->id);
 1117 
 1118   end:
 1119     client->engine = eng;
 1120     LIST_ADDQ(&eng->clients, &client->by_engine);
 1121 }
 1122 
 1123 static void
 1124 unuse_spoe_engine(struct client *client)
 1125 {
 1126     struct spoe_engine *eng;
 1127     struct spoe_frame  *frame, *back;
 1128 
 1129     if (client == NULL || client->engine == NULL)
 1130         return;
 1131 
 1132     eng = client->engine;
 1133     client->engine = NULL;
 1134     LIST_DEL(&client->by_engine);
 1135     if (!LIST_ISEMPTY(&eng->clients))
 1136         return;
 1137 
 1138     LOG(client->worker, "Remove SPOE engine '%s'", eng->id);
 1139     LIST_DEL(&eng->list);
 1140 
 1141     list_for_each_entry_safe(frame, back, &eng->processing_frames, list) {
 1142         release_frame(frame);
 1143     }
 1144     list_for_each_entry_safe(frame, back, &eng->outgoing_frames, list) {
 1145         release_frame(frame);
 1146     }
 1147     free(eng->id);
 1148     free(eng);
 1149 }
 1150 
 1151 
 1152 static struct spoe_frame *
 1153 acquire_incoming_frame(struct client *client)
 1154 {
 1155     struct spoe_frame *frame;
 1156 
 1157     frame = client->incoming_frame;
 1158     if (frame != NULL)
 1159         return frame;
 1160 
 1161     if (LIST_ISEMPTY(&client->worker->frames)) {
 1162         if ((frame = calloc(1, sizeof(*frame)+max_frame_size+4)) == NULL) {
 1163             LOG(client->worker, "Failed to allocate new frame : %m");
 1164             return NULL;
 1165         }
 1166     }
 1167     else {
 1168         frame = LIST_NEXT(&client->worker->frames, typeof(frame), list);
 1169         LIST_DEL(&frame->list);
 1170     }
 1171 
 1172     reset_frame(frame);
 1173     frame->worker = client->worker;
 1174     frame->engine = client->engine;
 1175     frame->client = client;
 1176 
 1177     if (event_assign(&frame->process_frame_event, client->worker->base, -1,
 1178              EV_TIMEOUT|EV_PERSIST, process_frame_cb, frame) < 0) {
 1179         LOG(client->worker, "Failed to create frame event");
 1180         return NULL;
 1181     }
 1182 
 1183     client->incoming_frame = frame;
 1184     return frame;
 1185 }
 1186 
 1187 static struct spoe_frame *
 1188 acquire_outgoing_frame(struct client *client)
 1189 {
 1190     struct spoe_engine *engine = client->engine;
 1191     struct spoe_frame  *frame = NULL;
 1192 
 1193     if (client->outgoing_frame != NULL)
 1194         frame = client->outgoing_frame;
 1195     else if (!LIST_ISEMPTY(&client->outgoing_frames)) {
 1196         frame = LIST_NEXT(&client->outgoing_frames, typeof(frame), list);
 1197         LIST_DEL(&frame->list);
 1198         client->outgoing_frame = frame;
 1199     }
 1200     else if (engine!= NULL && !LIST_ISEMPTY(&engine->outgoing_frames)) {
 1201         frame = LIST_NEXT(&engine->outgoing_frames, typeof(frame), list);
 1202         LIST_DEL(&frame->list);
 1203         client->outgoing_frame = frame;
 1204     }
 1205     return frame;
 1206 }
 1207 
 1208 static void
 1209 write_frame(struct client *client, struct spoe_frame *frame)
 1210 {
 1211     uint32_t netint;
 1212 
 1213     LIST_DEL(&frame->list);
 1214 
 1215     frame->buf    = (char *)(frame->data);
 1216     frame->offset = 0;
 1217     netint        = htonl(frame->len);
 1218     memcpy(frame->buf, &netint, 4);
 1219 
 1220     if (client != NULL) { /* HELLO or DISCONNECT frames */
 1221         event_add(&client->write_frame_event, NULL);
 1222 
 1223         /* Try to process the frame as soon as possible, and always
 1224          * attach it to the client */
 1225         if (client->async || client->pipelining) {
 1226             if (client->outgoing_frame == NULL)
 1227                 client->outgoing_frame = frame;
 1228             else
 1229                 LIST_ADD(&client->outgoing_frames, &frame->list);
 1230         }
 1231         else {
 1232             client->outgoing_frame = frame;
 1233             event_del(&client->read_frame_event);
 1234         }
 1235     }
 1236     else { /* for all other frames */
 1237         if (frame->client == NULL) { /* async mode ! */
 1238             LIST_ADDQ(&frame->engine->outgoing_frames, &frame->list);
 1239             list_for_each_entry(client, &frame->engine->clients, by_engine)
 1240                 event_add(&client->write_frame_event, NULL);
 1241         }
 1242         else if (frame->client->pipelining) {
 1243             LIST_ADDQ(&frame->client->outgoing_frames, &frame->list);
 1244             event_add(&frame->client->write_frame_event, NULL);
 1245         }
 1246         else {
 1247             frame->client->outgoing_frame = frame;
 1248             event_add(&frame->client->write_frame_event, NULL);
 1249             event_del(&frame->client->read_frame_event);
 1250         }
 1251     }
 1252 }
 1253 
 1254 static void
 1255 process_incoming_frame(struct spoe_frame *frame)
 1256 {
 1257     struct client *client = frame->client;
 1258 
 1259     if (event_add(&frame->process_frame_event, &processing_delay) < 0) {
 1260         LOG(client->worker, "Failed to process incoming frame");
 1261         release_frame(frame);
 1262         return;
 1263     }
 1264 
 1265     if (client->async) {
 1266         frame->client = NULL;
 1267         LIST_ADDQ(&frame->engine->processing_frames, &frame->list);
 1268     }
 1269     else if (client->pipelining)
 1270         LIST_ADDQ(&client->processing_frames, &frame->list);
 1271     else
 1272         event_del(&client->read_frame_event);
 1273 }
 1274 
 1275 static void
 1276 signal_cb(evutil_socket_t sig, short events, void *user_data)
 1277 {
 1278     struct event_base *base = user_data;
 1279     int                i;
 1280 
 1281     DEBUG(&null_worker, "Stopping the server");
 1282 
 1283     event_base_loopbreak(base);
 1284     DEBUG(&null_worker, "Main event loop stopped");
 1285 
 1286     for (i = 0; i < num_workers; i++) {
 1287         event_base_loopbreak(workers[i].base);
 1288         DEBUG(&null_worker, "Event loop stopped for worker %02d",
 1289               workers[i].id);
 1290     }
 1291 }
 1292 
 1293 static void
 1294 worker_monitor_cb(evutil_socket_t fd, short events, void *arg)
 1295 {
 1296     struct worker *worker = arg;
 1297 
 1298     LOG(worker, "%u clients connected (%u frames)", worker->nbclients, worker->nbframes);
 1299 }
 1300 
 1301 static void
 1302 process_frame_cb(evutil_socket_t fd, short events, void *arg)
 1303 {
 1304     struct spoe_frame *frame  = arg;
 1305     char              *p, *end;
 1306     int                ret;
 1307 
 1308     DEBUG(frame->worker,
 1309           "Process frame messages : STREAM-ID=%u - FRAME-ID=%u - length=%u bytes",
 1310           frame->stream_id, frame->frame_id, frame->len - frame->offset);
 1311 
 1312     p   = frame->buf + frame->offset;
 1313     end = frame->buf + frame->len;
 1314 
 1315     /* Loop on messages */
 1316     while (p < end) {
 1317         char    *str;
 1318         uint64_t sz;
 1319         int      nbargs;
 1320 
 1321         /* Decode the message name */
 1322         spoe_decode_buffer(&p, end, &str, &sz);
 1323         if (!str)
 1324             goto stop_processing;
 1325 
 1326         DEBUG(frame->worker, "Process SPOE Message '%.*s'", (int)sz, str);
 1327 
 1328         nbargs = (unsigned char)*p++;      /* Get the number of arguments */
 1329         frame->offset = (p - frame->buf);  /* Save index to handle errors and skip args */
 1330         if (!memcmp(str, "check-client-ip", sz)) {
 1331             union spoe_data data;
 1332             enum spoe_data_type type;
 1333 
 1334             if (nbargs != 1)
 1335                 goto skip_message;
 1336 
 1337             if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
 1338                 goto stop_processing;
 1339             if (spoe_decode_data(&p, end, &data, &type) == -1)
 1340                 goto skip_message;
 1341             frame->worker->nbframes++;
 1342             if (type == SPOE_DATA_T_IPV4)
 1343                 check_ipv4_reputation(frame, &data.ipv4);
 1344             if (type == SPOE_DATA_T_IPV6)
 1345                 check_ipv6_reputation(frame, &data.ipv6);
 1346         }
 1347         else {
 1348           skip_message:
 1349             p = frame->buf + frame->offset; /* Restore index */
 1350 
 1351             while (nbargs-- > 0) {
 1352                 /* Silently ignore argument: its name and its value */
 1353                 if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
 1354                     goto stop_processing;
 1355                 if (spoe_skip_data(&p, end) == -1)
 1356                     goto stop_processing;
 1357             }
 1358         }
 1359     }
 1360 
 1361   stop_processing:
 1362     /* Prepare agent ACK frame */
 1363     frame->buf    = (char *)(frame->data) + 4;
 1364     frame->offset = 0;
 1365     frame->len    = 0;
 1366     frame->flags  = 0;
 1367 
 1368     ret = prepare_agentack(frame);
 1369     p   = frame->buf + ret;
 1370     end = frame->buf+max_frame_size;
 1371 
 1372     if (frame->ip_score != -1) {
 1373         DEBUG(frame->worker, "Add action : set variable ip_scode=%u",
 1374               frame->ip_score);
 1375 
 1376         *p++ = SPOE_ACT_T_SET_VAR;                     /* Action type */
 1377         *p++ = 3;                                      /* Number of args */
 1378         *p++ = SPOE_SCOPE_SESS;                        /* Arg 1: the scope */
 1379         spoe_encode_buffer("ip_score", 8, &p, end);    /* Arg 2: variable name */
 1380         *p++ = SPOE_DATA_T_UINT32;
 1381         encode_varint(frame->ip_score, &p, end); /* Arg 3: variable value */
 1382         frame->len = (p - frame->buf);
 1383     }
 1384     write_frame(NULL, frame);
 1385 }
 1386 
 1387 static void
 1388 read_frame_cb(evutil_socket_t fd, short events, void *arg)
 1389 {
 1390     struct client     *client = arg;
 1391     struct spoe_frame *frame;
 1392     uint32_t           netint;
 1393     int                n;
 1394 
 1395     DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__);
 1396     if ((frame = acquire_incoming_frame(client)) == NULL)
 1397         goto close;
 1398 
 1399     frame->type = SPOA_FRM_T_HAPROXY;
 1400     if (frame->buf == (char *)(frame->data)) {
 1401         /* Read the frame length: frame->buf points on length part (frame->data) */
 1402         n = read(client->fd, frame->buf+frame->offset, 4-frame->offset);
 1403         if (n <= 0) {
 1404             if (n < 0)
 1405                 LOG(client->worker, "Failed to read frame length : %m");
 1406             goto close;
 1407         }
 1408         frame->offset += n;
 1409         if (frame->offset != 4)
 1410             return;
 1411         memcpy(&netint, frame->buf, 4);
 1412         frame->buf   += 4;
 1413         frame->offset = 0;
 1414         frame->len    = ntohl(netint);
 1415     }
 1416 
 1417     /* Read the frame: frame->buf points on frame part (frame->data+4)*/
 1418     n = read(client->fd, frame->buf + frame->offset,
 1419          frame->len - frame->offset);
 1420     if (n <= 0) {
 1421         if (n < 0) {
 1422             LOG(client->worker, "Frame to read frame : %m");
 1423             goto close;
 1424         }
 1425         return;
 1426     }
 1427     frame->offset += n;
 1428     if (frame->offset != frame->len)
 1429         return;
 1430     frame->offset = 0;
 1431 
 1432     DEBUG(client->worker, "<%lu> New Frame of %u bytes received",
 1433           client->id, frame->len);
 1434 
 1435     switch (client->state) {
 1436         case SPOA_ST_CONNECTING:
 1437             if (handle_hahello(frame) < 0) {
 1438                 LOG(client->worker, "Failed to decode HELLO frame");
 1439                 goto disconnect;
 1440             }
 1441             prepare_agenthello(frame);
 1442             goto write_frame;
 1443 
 1444         case SPOA_ST_PROCESSING:
 1445             if (frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
 1446                 client->state = SPOA_ST_DISCONNECTING;
 1447                 goto disconnecting;
 1448             }
 1449             if (frame->buf[0] == SPOE_FRM_T_UNSET)
 1450                 n = handle_hafrag(frame);
 1451             else
 1452                 n = handle_hanotify(frame);
 1453 
 1454             if (n < 0) {
 1455                 LOG(client->worker, "Failed to decode frame: %s",
 1456                     spoe_frm_err_reasons[client->status_code]);
 1457                 goto disconnect;
 1458             }
 1459             else if (n == 0) {
 1460                 LOG(client->worker, "Ignore invalid/unknown/aborted frame");
 1461                 goto ignore_frame;
 1462             }
 1463             else if (n == 1)
 1464                 goto noop;
 1465             else
 1466                 goto process_frame;
 1467 
 1468         case SPOA_ST_DISCONNECTING:
 1469           disconnecting:
 1470             if (handle_hadiscon(frame) < 0) {
 1471                 LOG(client->worker, "Failed to decode DISCONNECT frame");
 1472                 goto disconnect;
 1473             }
 1474             if (client->status_code != SPOE_FRM_ERR_NONE)
 1475                 LOG(client->worker, "<%lu> Peer closed connection: %s",
 1476                     client->id, spoe_frm_err_reasons[client->status_code]);
 1477             goto disconnect;
 1478     }
 1479 
 1480   noop:
 1481     return;
 1482 
 1483   ignore_frame:
 1484     reset_frame(frame);
 1485     return;
 1486 
 1487   process_frame:
 1488     process_incoming_frame(frame);
 1489     client->incoming_frame = NULL;
 1490     return;
 1491 
 1492   write_frame:
 1493     write_frame(client, frame);
 1494     client->incoming_frame = NULL;
 1495     return;
 1496 
 1497   disconnect:
 1498     client->state = SPOA_ST_DISCONNECTING;
 1499     if (prepare_agentdicon(frame) < 0) {
 1500         LOG(client->worker, "Failed to encode DISCONNECT frame");
 1501         goto close;
 1502     }
 1503     goto write_frame;
 1504 
 1505   close:
 1506     release_client(client);
 1507 }
 1508 
 1509 static void
 1510 write_frame_cb(evutil_socket_t fd, short events, void *arg)
 1511 {
 1512     struct client     *client = arg;
 1513     struct spoe_frame *frame;
 1514     int                n;
 1515 
 1516     DEBUG(client->worker, "<%lu> %s", client->id, __FUNCTION__);
 1517     if ((frame = acquire_outgoing_frame(client)) == NULL) {
 1518         event_del(&client->write_frame_event);
 1519         return;
 1520     }
 1521 
 1522     if (frame->buf == (char *)(frame->data)) {
 1523         /* Write the frame length: frame->buf points on length part (frame->data) */
 1524         n = write(client->fd, frame->buf+frame->offset, 4-frame->offset);
 1525         if (n <= 0) {
 1526             if (n < 0)
 1527                 LOG(client->worker, "Failed to write frame length : %m");
 1528             goto close;
 1529         }
 1530         frame->offset += n;
 1531         if (frame->offset != 4)
 1532             return;
 1533         frame->buf   += 4;
 1534         frame->offset = 0;
 1535     }
 1536 
 1537     /* Write the frame: frame->buf points on frame part (frame->data+4)*/
 1538     n = write(client->fd, frame->buf + frame->offset,
 1539           frame->len - frame->offset);
 1540     if (n <= 0) {
 1541         if (n < 0) {
 1542             LOG(client->worker, "Failed to write frame : %m");
 1543             goto close;
 1544         }
 1545         return;
 1546     }
 1547     frame->offset += n;
 1548     if (frame->offset != frame->len)
 1549         return;
 1550 
 1551     DEBUG(client->worker, "<%lu> Frame of %u bytes send",
 1552           client->id, frame->len);
 1553 
 1554     switch (client->state) {
 1555         case SPOA_ST_CONNECTING:
 1556             if (frame->hcheck == true) {
 1557                 DEBUG(client->worker,
 1558                       "<%lu> Close client after healthcheck",
 1559                       client->id);
 1560                 goto close;
 1561             }
 1562             client->state = SPOA_ST_PROCESSING;
 1563             break;
 1564 
 1565         case SPOA_ST_PROCESSING:
 1566             break;
 1567 
 1568         case SPOA_ST_DISCONNECTING:
 1569             goto close;
 1570     }
 1571 
 1572     release_frame(frame);
 1573     client->outgoing_frame = NULL;
 1574     if (!client->async && !client->pipelining) {
 1575         event_del(&client->write_frame_event);
 1576         event_add(&client->read_frame_event, NULL);
 1577     }
 1578     return;
 1579 
 1580   close:
 1581     release_client(client);
 1582 }
 1583 
 1584 static void
 1585 accept_cb(int listener, short event, void *arg)
 1586 {
 1587     struct worker     *worker;
 1588     struct client     *client;
 1589     int                fd;
 1590 
 1591     worker = &workers[clicount++ % num_workers];
 1592 
 1593     if ((fd = accept(listener, NULL, NULL)) < 0) {
 1594         if (errno != EAGAIN && errno != EWOULDBLOCK)
 1595             LOG(worker, "Failed to accept client connection : %m");
 1596         return;
 1597     }
 1598 
 1599     DEBUG(&null_worker,
 1600           "<%lu> New Client connection accepted and assigned to worker %02d",
 1601           clicount, worker->id);
 1602 
 1603     if (evutil_make_socket_nonblocking(fd) < 0) {
 1604         LOG(&null_worker, "Failed to set client socket to non-blocking : %m");
 1605         close(fd);
 1606         return;
 1607     }
 1608 
 1609     if ((client = calloc(1, sizeof(*client))) == NULL) {
 1610         LOG(&null_worker, "Failed to allocate memory for client state : %m");
 1611         close(fd);
 1612         return;
 1613     }
 1614 
 1615     client->id             = clicount;
 1616     client->fd             = fd;
 1617     client->worker         = worker;
 1618     client->state          = SPOA_ST_CONNECTING;
 1619     client->status_code    = SPOE_FRM_ERR_NONE;
 1620     client->max_frame_size = max_frame_size;
 1621     client->engine         = NULL;
 1622     client->pipelining     = false;
 1623     client->async          = false;
 1624     client->incoming_frame = NULL;
 1625     client->outgoing_frame = NULL;
 1626     LIST_INIT(&client->processing_frames);
 1627     LIST_INIT(&client->outgoing_frames);
 1628 
 1629     LIST_ADDQ(&worker->clients, &client->by_worker);
 1630 
 1631     worker->nbclients++;
 1632 
 1633     if (event_assign(&client->read_frame_event, worker->base, fd,
 1634              EV_READ|EV_PERSIST, read_frame_cb, client) < 0     ||
 1635         event_assign(&client->write_frame_event, worker->base, fd,
 1636              EV_WRITE|EV_PERSIST, write_frame_cb, client) < 0) {
 1637         LOG(&null_worker, "Failed to create client events");
 1638         release_client(client);
 1639         return;
 1640     }
 1641     event_add(&client->read_frame_event,  NULL);
 1642 }
 1643 
 1644 static void *
 1645 worker_function(void *data)
 1646 {
 1647     struct client     *client, *cback;
 1648     struct spoe_frame *frame, *fback;
 1649     struct worker     *worker = data;
 1650 
 1651     DEBUG(worker, "Worker ready to process client messages");
 1652     event_base_dispatch(worker->base);
 1653 
 1654     list_for_each_entry_safe(client, cback, &worker->clients, by_worker) {
 1655         release_client(client);
 1656     }
 1657 
 1658     list_for_each_entry_safe(frame, fback, &worker->frames, list) {
 1659         LIST_DEL(&frame->list);
 1660         free(frame);
 1661     }
 1662 
 1663     event_free(worker->monitor_event);
 1664     event_base_free(worker->base);
 1665     DEBUG(worker, "Worker is stopped");
 1666     pthread_exit(&null_worker);
 1667 }
 1668 
 1669 
 1670 static int
 1671 parse_processing_delay(const char *str)
 1672 {
 1673         unsigned long value;
 1674 
 1675         value = 0;
 1676         while (1) {
 1677                 unsigned int j;
 1678 
 1679                 j = *str - '0';
 1680                 if (j > 9)
 1681                         break;
 1682                 str++;
 1683                 value *= 10;
 1684                 value += j;
 1685         }
 1686 
 1687         switch (*str) {
 1688         case '\0': /* no unit = millisecond */
 1689             value *= 1000;
 1690             break;
 1691         case 's': /* second */
 1692             value *= 1000000;
 1693             str++;
 1694             break;
 1695         case 'm': /* millisecond : "ms" */
 1696             if (str[1] != 's')
 1697                 return -1;
 1698             value *= 1000;
 1699             str += 2;
 1700             break;
 1701         case 'u': /* microsecond : "us" */
 1702             if (str[1] != 's')
 1703                 return -1;
 1704             str += 2;
 1705             break;
 1706         default:
 1707             return -1;
 1708         }
 1709     if (*str)
 1710         return -1;
 1711 
 1712     processing_delay.tv_sec = (time_t)(value / 1000000);
 1713     processing_delay.tv_usec = (suseconds_t)(value % 1000000);
 1714         return 0;
 1715 }
 1716 
 1717 
 1718 static void
 1719 usage(char *prog)
 1720 {
 1721     fprintf(stderr,
 1722         "Usage : %s [OPTION]...\n"
 1723         "    -h                   Print this message\n"
 1724         "    -d                   Enable the debug mode\n"
 1725         "    -m <max-frame-size>  Specify the maximum frame size (default : %u)\n"
 1726         "    -p <port>            Specify the port to listen on (default : %d)\n"
 1727         "    -n <num-workers>     Specify the number of workers (default : %d)\n"
 1728         "    -c <capability>      Enable the support of the specified capability\n"
 1729         "    -t <time>            Set a delay to process a message (default: 0)\n"
 1730         "                           The value is specified in milliseconds by default,\n"
 1731         "                           but can be in any other unit if the number is suffixed\n"
 1732         "                           by a unit (us, ms, s)\n"
 1733         "\n"
 1734         "    Supported capabilities: fragmentation, pipelining, async\n",
 1735         prog, MAX_FRAME_SIZE, DEFAULT_PORT, NUM_WORKERS);
 1736 }
 1737 
 1738 int
 1739 main(int argc, char **argv)
 1740 {
 1741     struct event_base *base = NULL;
 1742     struct event      *signal_event = NULL, *accept_event = NULL;
 1743     int                opt, i, fd = -1;
 1744 
 1745     // TODO: add '-t <processing-time>' option
 1746     while ((opt = getopt(argc, argv, "hdm:n:p:c:t:")) != -1) {
 1747         switch (opt) {
 1748             case 'h':
 1749                 usage(argv[0]);
 1750                 return EXIT_SUCCESS;
 1751             case 'd':
 1752                 debug = true;
 1753                 break;
 1754             case 'm':
 1755                 max_frame_size = atoi(optarg);
 1756                 break;
 1757             case 'n':
 1758                 num_workers = atoi(optarg);
 1759                 break;
 1760             case 'p':
 1761                 server_port = atoi(optarg);
 1762                 break;
 1763             case 'c':
 1764                 if (!strcmp(optarg, "pipelining"))
 1765                     pipelining = true;
 1766                 else if (!strcmp(optarg, "async"))
 1767                     async = true;
 1768                 else if (!strcmp(optarg, "fragmentation"))
 1769                     fragmentation = true;
 1770                 else
 1771                     fprintf(stderr, "WARNING: unsupported capability '%s'\n", optarg);
 1772                 break;
 1773             case 't':
 1774                 if (!parse_processing_delay(optarg))
 1775                     break;
 1776                 fprintf(stderr, "%s: failed to parse time '%s'.\n", argv[0], optarg);
 1777                 fprintf(stderr, "Try '%s -h' for more information.\n", argv[0]);
 1778                 return EXIT_FAILURE;
 1779             default:
 1780                 usage(argv[0]);
 1781                 return EXIT_FAILURE;
 1782         }
 1783     }
 1784 
 1785     if (num_workers <= 0) {
 1786         LOG(&null_worker, "%s : Invalid number of workers '%d'\n",
 1787             argv[0], num_workers);
 1788         goto error;
 1789     }
 1790 
 1791     if (server_port <= 0) {
 1792         LOG(&null_worker, "%s : Invalid port '%d'\n",
 1793             argv[0], server_port);
 1794         goto error;
 1795     }
 1796 
 1797 
 1798     if (evthread_use_pthreads() < 0) {
 1799         LOG(&null_worker, "No pthreads support for libevent");
 1800         goto error;
 1801     }
 1802 
 1803     if ((base = event_base_new()) == NULL) {
 1804         LOG(&null_worker, "Failed to initialize libevent : %m");
 1805         goto error;
 1806     }
 1807 
 1808     signal(SIGPIPE, SIG_IGN);
 1809 
 1810     if ((fd = create_server_socket()) < 0) {
 1811         LOG(&null_worker, "Failed to create server socket");
 1812         goto error;
 1813     }
 1814     if (evutil_make_socket_nonblocking(fd) < 0) {
 1815         LOG(&null_worker, "Failed to set server socket to non-blocking");
 1816         goto error;
 1817     }
 1818 
 1819     if ((workers = calloc(num_workers, sizeof(*workers))) == NULL) {
 1820         LOG(&null_worker, "Failed to set allocate memory for workers");
 1821         goto error;
 1822     }
 1823 
 1824     for (i = 0; i < num_workers; ++i) {
 1825         struct worker *w = &workers[i];
 1826 
 1827         w->id        = i+1;
 1828         w->nbclients = 0;
 1829         w->nbframes  = 0;
 1830         LIST_INIT(&w->engines);
 1831         LIST_INIT(&w->clients);
 1832         LIST_INIT(&w->frames);
 1833 
 1834         if ((w->base = event_base_new()) == NULL) {
 1835             LOG(&null_worker,
 1836                 "Failed to initialize libevent for worker %02d : %m",
 1837                 w->id);
 1838             goto error;
 1839         }
 1840 
 1841         w->monitor_event = event_new(w->base, fd, EV_PERSIST,
 1842                          worker_monitor_cb, (void *)w);
 1843         if (w->monitor_event == NULL ||
 1844             event_add(w->monitor_event, (struct timeval[]){{5,0}}) < 0) {
 1845             LOG(&null_worker,
 1846                 "Failed to create monitor event for worker %02d",
 1847                 w->id);
 1848             goto error;
 1849         }
 1850 
 1851         if (pthread_create(&w->thread, NULL, worker_function, (void *)w)) {
 1852             LOG(&null_worker,
 1853                 "Failed to start thread for worker %02d : %m",
 1854                 w->id);
 1855         }
 1856         DEBUG(&null_worker, "Worker %02d initialized", w->id);
 1857     }
 1858 
 1859     accept_event = event_new(base, fd, EV_READ|EV_PERSIST, accept_cb,
 1860                  (void *)base);
 1861     if (accept_event == NULL || event_add(accept_event, NULL) < 0) {
 1862         LOG(&null_worker, "Failed to create accept event : %m");
 1863     }
 1864 
 1865     signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
 1866     if (signal_event == NULL || event_add(signal_event, NULL) < 0) {
 1867         LOG(&null_worker, "Failed to create signal event : %m");
 1868     }
 1869 
 1870     DEBUG(&null_worker,
 1871           "Server is ready"
 1872           " [fragmentation=%s - pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
 1873           (fragmentation?"true":"false"), (pipelining?"true":"false"), (async?"true":"false"),
 1874           (debug?"true":"false"), max_frame_size);
 1875     event_base_dispatch(base);
 1876 
 1877     for (i = 0; i < num_workers; i++) {
 1878         struct worker *w = &workers[i];
 1879 
 1880         pthread_join(w->thread, NULL);
 1881         DEBUG(&null_worker, "Worker %02d terminated", w->id);
 1882     }
 1883 
 1884     free(workers);
 1885     event_free(signal_event);
 1886     event_free(accept_event);
 1887     event_base_free(base);
 1888     close(fd);
 1889     return EXIT_SUCCESS;
 1890 
 1891   error:
 1892     if (workers != NULL)
 1893         free(workers);
 1894     if (signal_event != NULL)
 1895         event_free(signal_event);
 1896     if (accept_event != NULL)
 1897         event_free(accept_event);
 1898     if (base != NULL)
 1899         event_base_free(base);
 1900     if (fd != -1)
 1901         close(fd);
 1902     return EXIT_FAILURE;
 1903 }