"Fossies" - the Fresh Open Source Software Archive

Member "haproxy-2.0.9/src/flt_spoe.c" (15 Nov 2019, 137258 Bytes) of package /linux/misc/haproxy-2.0.9.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "flt_spoe.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 2.0.8_vs_2.0.9.

    1 /*
    2  * Stream processing offload engine management.
    3  *
    4  * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
    5  *
    6  * This program is free software; you can redistribute it and/or
    7  * modify it under the terms of the GNU General Public License
    8  * as published by the Free Software Foundation; either version
    9  * 2 of the License, or (at your option) any later version.
   10  *
   11  */
   12 #include <ctype.h>
   13 #include <errno.h>
   14 
   15 #include <common/cfgparse.h>
   16 #include <common/compat.h>
   17 #include <common/config.h>
   18 #include <common/debug.h>
   19 #include <common/hathreads.h>
   20 #include <common/initcall.h>
   21 #include <common/memory.h>
   22 #include <common/time.h>
   23 
   24 #include <types/arg.h>
   25 #include <types/global.h>
   26 #include <types/spoe.h>
   27 
   28 #include <proto/acl.h>
   29 #include <proto/action.h>
   30 #include <proto/arg.h>
   31 #include <proto/backend.h>
   32 #include <proto/filters.h>
   33 #include <proto/freq_ctr.h>
   34 #include <proto/frontend.h>
   35 #include <proto/http_rules.h>
   36 #include <proto/log.h>
   37 #include <proto/proto_http.h>
   38 #include <proto/proxy.h>
   39 #include <proto/sample.h>
   40 #include <proto/session.h>
   41 #include <proto/signal.h>
   42 #include <proto/spoe.h>
   43 #include <proto/stream.h>
   44 #include <proto/stream_interface.h>
   45 #include <proto/task.h>
   46 #include <proto/tcp_rules.h>
   47 #include <proto/vars.h>
   48 
   49 #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
   50 #define SPOE_PRINTF(x...) fprintf(x)
   51 #define SPOE_DEBUG_STMT(statement) statement
   52 #else
   53 #define SPOE_PRINTF(x...)
   54 #define SPOE_DEBUG_STMT(statement)
   55 #endif
   56 
   57 /* Reserved 4 bytes to the frame size. So a frame and its size can be written
   58  * together in a buffer */
   59 #define MAX_FRAME_SIZE     global.tune.bufsize - 4
   60 
   61 /* The minimum size for a frame */
   62 #define MIN_FRAME_SIZE     256
   63 
   64 /* Reserved for the metadata and the frame type.
   65  * So <MAX_FRAME_SIZE> - <FRAME_HDR_SIZE> is the maximum payload size */
   66 #define FRAME_HDR_SIZE     32
   67 
   68 /* Helper to get SPOE ctx inside an appctx */
   69 #define SPOE_APPCTX(appctx) ((struct spoe_appctx *)((appctx)->ctx.spoe.ptr))
   70 
   71 /* SPOE filter id. Used to identify SPOE filters */
   72 const char *spoe_filter_id = "SPOE filter";
   73 
   74 /* Set if the handle on SIGUSR1 is registered */
   75 static int sighandler_registered = 0;
   76 
   77 /* proxy used during the parsing */
   78 struct proxy *curproxy = NULL;
   79 
   80 /* The name of the SPOE engine, used during the parsing */
   81 char *curengine = NULL;
   82 
   83 /* SPOE agent used during the parsing */
   84 /* SPOE agent/group/message used during the parsing */
   85 struct spoe_agent   *curagent = NULL;
   86 struct spoe_group   *curgrp   = NULL;
   87 struct spoe_message *curmsg   = NULL;
   88 
   89 /* list of SPOE messages and placeholders used during the parsing */
   90 struct list curmsgs;
   91 struct list curgrps;
   92 struct list curmphs;
   93 struct list curgphs;
   94 struct list curvars;
   95 
   96 /* list of log servers used during the parsing */
   97 struct list curlogsrvs;
   98 
   99 /* agent's proxy flags (PR_O_* and PR_O2_*) used during parsing */
  100 int curpxopts;
  101 int curpxopts2;
  102 
  103 /* Pools used to allocate SPOE structs */
  104 DECLARE_STATIC_POOL(pool_head_spoe_ctx,    "spoe_ctx",    sizeof(struct spoe_context));
  105 DECLARE_STATIC_POOL(pool_head_spoe_appctx, "spoe_appctx", sizeof(struct spoe_appctx));
  106 
  107 struct flt_ops spoe_ops;
  108 
  109 static int  spoe_queue_context(struct spoe_context *ctx);
  110 static int  spoe_acquire_buffer(struct buffer *buf, struct buffer_wait *buffer_wait);
  111 static void spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait);
  112 
  113 /********************************************************************
  114  * helper functions/globals
  115  ********************************************************************/
  116 static void
  117 spoe_release_placeholder(struct spoe_placeholder *ph)
  118 {
  119     if (!ph)
  120         return;
  121     free(ph->id);
  122     free(ph);
  123 }
  124 
  125 static void
  126 spoe_release_message(struct spoe_message *msg)
  127 {
  128     struct spoe_arg *arg, *argback;
  129     struct acl      *acl, *aclback;
  130 
  131     if (!msg)
  132         return;
  133     free(msg->id);
  134     free(msg->conf.file);
  135     list_for_each_entry_safe(arg, argback, &msg->args, list) {
  136         release_sample_expr(arg->expr);
  137         free(arg->name);
  138         LIST_DEL(&arg->list);
  139         free(arg);
  140     }
  141     list_for_each_entry_safe(acl, aclback, &msg->acls, list) {
  142         LIST_DEL(&acl->list);
  143         prune_acl(acl);
  144         free(acl);
  145     }
  146     if (msg->cond) {
  147         prune_acl_cond(msg->cond);
  148         free(msg->cond);
  149     }
  150     free(msg);
  151 }
  152 
  153 static void
  154 spoe_release_group(struct spoe_group *grp)
  155 {
  156     if (!grp)
  157         return;
  158     free(grp->id);
  159     free(grp->conf.file);
  160     free(grp);
  161 }
  162 
  163 static void
  164 spoe_release_agent(struct spoe_agent *agent)
  165 {
  166     struct spoe_message *msg, *msgback;
  167     struct spoe_group   *grp, *grpback;
  168     int                  i;
  169 
  170     if (!agent)
  171         return;
  172     free(agent->id);
  173     free(agent->conf.file);
  174     free(agent->var_pfx);
  175     free(agent->var_on_error);
  176     free(agent->var_t_process);
  177     free(agent->var_t_total);
  178     list_for_each_entry_safe(msg, msgback, &agent->messages, list) {
  179         LIST_DEL(&msg->list);
  180         spoe_release_message(msg);
  181     }
  182     list_for_each_entry_safe(grp, grpback, &agent->groups, list) {
  183         LIST_DEL(&grp->list);
  184         spoe_release_group(grp);
  185     }
  186     if (agent->rt) {
  187         for (i = 0; i < global.nbthread; ++i) {
  188             free(agent->rt[i].engine_id);
  189             HA_SPIN_DESTROY(&agent->rt[i].lock);
  190         }
  191     }
  192     free(agent->rt);
  193     free(agent);
  194 }
  195 
  196 static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
  197     [SPOE_FRM_ERR_NONE]               = "normal",
  198     [SPOE_FRM_ERR_IO]                 = "I/O error",
  199     [SPOE_FRM_ERR_TOUT]               = "a timeout occurred",
  200     [SPOE_FRM_ERR_TOO_BIG]            = "frame is too big",
  201     [SPOE_FRM_ERR_INVALID]            = "invalid frame received",
  202     [SPOE_FRM_ERR_NO_VSN]             = "version value not found",
  203     [SPOE_FRM_ERR_NO_FRAME_SIZE]      = "max-frame-size value not found",
  204     [SPOE_FRM_ERR_NO_CAP]             = "capabilities value not found",
  205     [SPOE_FRM_ERR_BAD_VSN]            = "unsupported version",
  206     [SPOE_FRM_ERR_BAD_FRAME_SIZE]     = "max-frame-size too big or too small",
  207     [SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported",
  208     [SPOE_FRM_ERR_INTERLACED_FRAMES]  = "invalid interlaced frames",
  209     [SPOE_FRM_ERR_FRAMEID_NOTFOUND]   = "frame-id not found",
  210     [SPOE_FRM_ERR_RES]                = "resource allocation error",
  211     [SPOE_FRM_ERR_UNKNOWN]            = "an unknown error occurred",
  212 };
  213 
  214 static const char *spoe_event_str[SPOE_EV_EVENTS] = {
  215     [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
  216     [SPOE_EV_ON_TCP_REQ_FE]  = "on-frontend-tcp-request",
  217     [SPOE_EV_ON_TCP_REQ_BE]  = "on-backend-tcp-request",
  218     [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
  219     [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
  220 
  221     [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
  222     [SPOE_EV_ON_TCP_RSP]     = "on-tcp-response",
  223     [SPOE_EV_ON_HTTP_RSP]    = "on-http-response",
  224 };
  225 
  226 
  227 #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
  228 
  229 static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
  230     [SPOE_CTX_ST_NONE]          = "NONE",
  231     [SPOE_CTX_ST_READY]         = "READY",
  232     [SPOE_CTX_ST_ENCODING_MSGS] = "ENCODING_MSGS",
  233     [SPOE_CTX_ST_SENDING_MSGS]  = "SENDING_MSGS",
  234     [SPOE_CTX_ST_WAITING_ACK]   = "WAITING_ACK",
  235     [SPOE_CTX_ST_DONE]          = "DONE",
  236     [SPOE_CTX_ST_ERROR]         = "ERROR",
  237 };
  238 
  239 static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
  240     [SPOE_APPCTX_ST_CONNECT]             = "CONNECT",
  241     [SPOE_APPCTX_ST_CONNECTING]          = "CONNECTING",
  242     [SPOE_APPCTX_ST_IDLE]                = "IDLE",
  243     [SPOE_APPCTX_ST_PROCESSING]          = "PROCESSING",
  244     [SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY] = "SENDING_FRAG_NOTIFY",
  245     [SPOE_APPCTX_ST_WAITING_SYNC_ACK]    = "WAITING_SYNC_ACK",
  246     [SPOE_APPCTX_ST_DISCONNECT]          = "DISCONNECT",
  247     [SPOE_APPCTX_ST_DISCONNECTING]       = "DISCONNECTING",
  248     [SPOE_APPCTX_ST_EXIT]                = "EXIT",
  249     [SPOE_APPCTX_ST_END]                 = "END",
  250 };
  251 
  252 #endif
  253 
  254 /* Used to generates a unique id for an engine. On success, it returns a
  255  * allocated string. So it is the caller's reponsibility to release it. If the
  256  * allocation failed, it returns NULL. */
  257 static char *
  258 generate_pseudo_uuid()
  259 {
  260     char *uuid;
  261     uint32_t rnd[4] = { 0, 0, 0, 0 };
  262     uint64_t last = 0;
  263     int byte = 0;
  264     uint8_t bits = 0;
  265     unsigned int rand_max_bits = my_flsl(RAND_MAX);
  266 
  267     if ((uuid = calloc(1, 37)) == NULL)
  268         return NULL;
  269 
  270     while (byte < 4) {
  271         while (bits < 32) {
  272             last |= (uint64_t)random() << bits;
  273             bits += rand_max_bits;
  274         }
  275         rnd[byte++] = last;
  276         last >>= 32u;
  277         bits  -= 32;
  278     }
  279     snprintf(uuid, 37, "%8.8x-%4.4x-%4.4x-%4.4x-%12.12llx",
  280                  rnd[0],
  281                  rnd[1] & 0xFFFF,
  282                  ((rnd[1] >> 16u) & 0xFFF) | 0x4000,  // highest 4 bits indicate the uuid version
  283                  (rnd[2] & 0x3FFF) | 0x8000,  // the highest 2 bits indicate the UUID variant (10),
  284                  (long long)((rnd[2] >> 14u) | ((uint64_t) rnd[3] << 18u)) & 0xFFFFFFFFFFFFull
  285             );
  286     return uuid;
  287 }
  288 
  289 
  290 static inline void
  291 spoe_update_stat_time(struct timeval *tv, long *t)
  292 {
  293     if (*t == -1)
  294         *t = tv_ms_elapsed(tv, &now);
  295     else
  296         *t += tv_ms_elapsed(tv, &now);
  297     tv_zero(tv);
  298 }
  299 
  300 /********************************************************************
  301  * Functions that encode/decode SPOE frames
  302  ********************************************************************/
  303 /* Helper to get static string length, excluding the terminating null byte */
  304 #define SLEN(str) (sizeof(str)-1)
  305 
  306 /* Predefined key used in HELLO/DISCONNECT frames */
  307 #define SUPPORTED_VERSIONS_KEY     "supported-versions"
  308 #define VERSION_KEY                "version"
  309 #define MAX_FRAME_SIZE_KEY         "max-frame-size"
  310 #define CAPABILITIES_KEY           "capabilities"
  311 #define ENGINE_ID_KEY              "engine-id"
  312 #define HEALTHCHECK_KEY            "healthcheck"
  313 #define STATUS_CODE_KEY            "status-code"
  314 #define MSG_KEY                    "message"
  315 
  316 struct spoe_version {
  317     char *str;
  318     int   min;
  319     int   max;
  320 };
  321 
  322 /* All supported versions */
  323 static struct spoe_version supported_versions[] = {
  324     /* 1.0 is now unsupported because of a bug about frame's flags*/
  325     {"2.0", 2000, 2000},
  326     {NULL,  0, 0}
  327 };
  328 
  329 /* Comma-separated list of supported versions */
  330 #define SUPPORTED_VERSIONS_VAL  "2.0"
  331 
  332 /* Convert a string to a SPOE version value. The string must follow the format
  333  * "MAJOR.MINOR". It will be concerted into the integer (1000 * MAJOR + MINOR).
  334  * If an error occurred, -1 is returned. */
  335 static int
  336 spoe_str_to_vsn(const char *str, size_t len)
  337 {
  338     const char *p, *end;
  339     int   maj, min, vsn;
  340 
  341     p   = str;
  342     end = str+len;
  343     maj = min = 0;
  344     vsn = -1;
  345 
  346     /* skip leading spaces */
  347     while (p < end && isspace(*p))
  348         p++;
  349 
  350     /* parse Major number, until the '.' */
  351     while (*p != '.') {
  352         if (p >= end || *p < '0' || *p > '9')
  353             goto out;
  354         maj *= 10;
  355         maj += (*p - '0');
  356         p++;
  357     }
  358 
  359     /* check Major version */
  360     if (!maj)
  361         goto out;
  362 
  363     p++; /* skip the '.' */
  364     if (p >= end || *p < '0' || *p > '9') /* Minor number is missing */
  365         goto out;
  366 
  367     /* Parse Minor number */
  368     while (p < end) {
  369         if (*p < '0' || *p > '9')
  370             break;
  371         min *= 10;
  372         min += (*p - '0');
  373         p++;
  374     }
  375 
  376     /* check Minor number */
  377     if (min > 999)
  378         goto out;
  379 
  380     /* skip trailing spaces */
  381     while (p < end && isspace(*p))
  382         p++;
  383     if (p != end)
  384         goto out;
  385 
  386     vsn = maj * 1000 + min;
  387   out:
  388     return vsn;
  389 }
  390 
  391 /* Encode the HELLO frame sent by HAProxy to an agent. It returns the number of
  392  * encoded bytes in the frame on success, 0 if an encoding error occurred and -1
  393  * if a fatal error occurred. */
  394 static int
  395 spoe_prepare_hahello_frame(struct appctx *appctx, char *frame, size_t size)
  396 {
  397     struct buffer      *chk;
  398     struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
  399     char              *p, *end;
  400     unsigned int       flags = SPOE_FRM_FL_FIN;
  401     size_t             sz;
  402 
  403     p   = frame;
  404     end = frame+size;
  405 
  406     /* Set Frame type */
  407     *p++ = SPOE_FRM_T_HAPROXY_HELLO;
  408 
  409     /* Set flags */
  410     flags = htonl(flags);
  411     memcpy(p, (char *)&flags, 4);
  412     p += 4;
  413 
  414     /* No stream-id and frame-id for HELLO frames */
  415     *p++ = 0; *p++ = 0;
  416 
  417     /* There are 3 mandatory items: "supported-versions", "max-frame-size"
  418      * and "capabilities" */
  419 
  420     /* "supported-versions" K/V item */
  421     sz = SLEN(SUPPORTED_VERSIONS_KEY);
  422     if (spoe_encode_buffer(SUPPORTED_VERSIONS_KEY, sz, &p, end) == -1)
  423         goto too_big;
  424 
  425     *p++ = SPOE_DATA_T_STR;
  426     sz = SLEN(SUPPORTED_VERSIONS_VAL);
  427     if (spoe_encode_buffer(SUPPORTED_VERSIONS_VAL, sz, &p, end) == -1)
  428         goto too_big;
  429 
  430     /* "max-fram-size" K/V item */
  431     sz = SLEN(MAX_FRAME_SIZE_KEY);
  432     if (spoe_encode_buffer(MAX_FRAME_SIZE_KEY, sz, &p, end) == -1)
  433         goto too_big;
  434 
  435     *p++ = SPOE_DATA_T_UINT32;
  436     if (encode_varint(SPOE_APPCTX(appctx)->max_frame_size, &p, end) == -1)
  437         goto too_big;
  438 
  439     /* "capabilities" K/V item */
  440     sz = SLEN(CAPABILITIES_KEY);
  441     if (spoe_encode_buffer(CAPABILITIES_KEY, sz, &p, end) == -1)
  442         goto too_big;
  443 
  444     *p++ = SPOE_DATA_T_STR;
  445     chk = get_trash_chunk();
  446     if (agent != NULL && (agent->flags & SPOE_FL_PIPELINING)) {
  447         memcpy(chk->area, "pipelining", 10);
  448         chk->data += 10;
  449     }
  450     if (agent != NULL && (agent->flags & SPOE_FL_ASYNC)) {
  451         if (chk->data) chk->area[chk->data++] = ',';
  452         memcpy(chk->area+chk->data, "async", 5);
  453         chk->data += 5;
  454     }
  455     if (agent != NULL && (agent->flags & SPOE_FL_RCV_FRAGMENTATION)) {
  456         if (chk->data) chk->area[chk->data++] = ',';
  457         memcpy(chk->area+chk->data, "fragmentation", 13);
  458         chk->data += 13;
  459     }
  460     if (spoe_encode_buffer(chk->area, chk->data, &p, end) == -1)
  461         goto too_big;
  462 
  463     /* (optionnal) "engine-id" K/V item, if present */
  464     if (agent != NULL && agent->rt[tid].engine_id != NULL) {
  465         sz = SLEN(ENGINE_ID_KEY);
  466         if (spoe_encode_buffer(ENGINE_ID_KEY, sz, &p, end) == -1)
  467             goto too_big;
  468 
  469         *p++ = SPOE_DATA_T_STR;
  470         sz = strlen(agent->rt[tid].engine_id);
  471         if (spoe_encode_buffer(agent->rt[tid].engine_id, sz, &p, end) == -1)
  472             goto too_big;
  473     }
  474 
  475     return (p - frame);
  476 
  477   too_big:
  478     SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
  479     return 0;
  480 }
  481 
  482 /* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the number of
  483  * encoded bytes in the frame on success, 0 if an encoding error occurred and -1
  484  * if a fatal error occurred.  */
  485 static int
  486 spoe_prepare_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
  487 {
  488     const char  *reason;
  489     char        *p, *end;
  490     unsigned int flags = SPOE_FRM_FL_FIN;
  491     size_t       sz;
  492 
  493     p   = frame;
  494     end = frame+size;
  495 
  496      /* Set Frame type */
  497     *p++ = SPOE_FRM_T_HAPROXY_DISCON;
  498 
  499     /* Set flags */
  500     flags = htonl(flags);
  501     memcpy(p, (char *)&flags, 4);
  502     p += 4;
  503 
  504     /* No stream-id and frame-id for DISCONNECT frames */
  505     *p++ = 0; *p++ = 0;
  506 
  507     if (SPOE_APPCTX(appctx)->status_code >= SPOE_FRM_ERRS)
  508         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_UNKNOWN;
  509 
  510     /* There are 2 mandatory items: "status-code" and "message" */
  511 
  512     /* "status-code" K/V item */
  513     sz = SLEN(STATUS_CODE_KEY);
  514     if (spoe_encode_buffer(STATUS_CODE_KEY, sz, &p, end) == -1)
  515         goto too_big;
  516 
  517     *p++ = SPOE_DATA_T_UINT32;
  518     if (encode_varint(SPOE_APPCTX(appctx)->status_code, &p, end) == -1)
  519         goto too_big;
  520 
  521     /* "message" K/V item */
  522     sz = SLEN(MSG_KEY);
  523     if (spoe_encode_buffer(MSG_KEY, sz, &p, end) == -1)
  524         goto too_big;
  525 
  526     /*Get the message corresponding to the status code */
  527     reason = spoe_frm_err_reasons[SPOE_APPCTX(appctx)->status_code];
  528 
  529     *p++ = SPOE_DATA_T_STR;
  530     sz = strlen(reason);
  531     if (spoe_encode_buffer(reason, sz, &p, end) == -1)
  532         goto too_big;
  533 
  534     return (p - frame);
  535 
  536   too_big:
  537     SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
  538     return 0;
  539 }
  540 
  541 /* Encode the NOTIFY frame sent by HAProxy to an agent. It returns the number of
  542  * encoded bytes in the frame on success, 0 if an encoding error occurred and -1
  543  * if a fatal error occurred. */
  544 static int
  545 spoe_prepare_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
  546                 char *frame, size_t size)
  547 {
  548     char        *p, *end;
  549     unsigned int stream_id, frame_id;
  550     unsigned int flags = SPOE_FRM_FL_FIN;
  551     size_t       sz;
  552 
  553     p   = frame;
  554     end = frame+size;
  555 
  556     stream_id = ctx->stream_id;
  557     frame_id  = ctx->frame_id;
  558 
  559     if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
  560         /* The fragmentation is not supported by the applet */
  561         if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
  562             SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
  563             return -1;
  564         }
  565         flags = ctx->frag_ctx.flags;
  566     }
  567 
  568     /* Set Frame type */
  569     *p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
  570 
  571     /* Set flags */
  572     flags = htonl(flags);
  573     memcpy(p, (char *)&flags, 4);
  574     p += 4;
  575 
  576     /* Set stream-id and frame-id */
  577     if (encode_varint(stream_id, &p, end) == -1)
  578         goto too_big;
  579     if (encode_varint(frame_id, &p, end) == -1)
  580         goto too_big;
  581 
  582     /* Copy encoded messages, if possible */
  583     sz = b_data(&ctx->buffer);
  584     if (p + sz >= end)
  585         goto too_big;
  586     memcpy(p, b_head(&ctx->buffer), sz);
  587     p += sz;
  588 
  589     return (p - frame);
  590 
  591   too_big:
  592     SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
  593     return 0;
  594 }
  595 
  596 /* Encode next part of a fragmented frame sent by HAProxy to an agent. It
  597  * returns the number of encoded bytes in the frame on success, 0 if an encoding
  598  * error occurred and -1 if a fatal error occurred. */
  599 static int
  600 spoe_prepare_hafrag_frame(struct appctx *appctx, struct spoe_context *ctx,
  601               char *frame, size_t size)
  602 {
  603     char        *p, *end;
  604     unsigned int stream_id, frame_id;
  605     unsigned int flags;
  606     size_t       sz;
  607 
  608     p   = frame;
  609     end = frame+size;
  610 
  611     /* <ctx> is null when the stream has aborted the processing of a
  612      * fragmented frame. In this case, we must notify the corresponding
  613      * agent using ids stored in <frag_ctx>. */
  614     if (ctx == NULL) {
  615         flags     = (SPOE_FRM_FL_FIN|SPOE_FRM_FL_ABRT);
  616         stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid;
  617         frame_id  = SPOE_APPCTX(appctx)->frag_ctx.curfid;
  618     }
  619     else {
  620         flags     = ctx->frag_ctx.flags;
  621         stream_id = ctx->stream_id;
  622         frame_id  = ctx->frame_id;
  623     }
  624 
  625     /* Set Frame type */
  626     *p++ = SPOE_FRM_T_UNSET;
  627 
  628     /* Set flags */
  629     flags = htonl(flags);
  630     memcpy(p, (char *)&flags, 4);
  631     p += 4;
  632 
  633     /* Set stream-id and frame-id */
  634     if (encode_varint(stream_id, &p, end) == -1)
  635         goto too_big;
  636     if (encode_varint(frame_id, &p, end) == -1)
  637         goto too_big;
  638 
  639     if (ctx == NULL)
  640         goto end;
  641 
  642     /* Copy encoded messages, if possible */
  643     sz = b_data(&ctx->buffer);
  644     if (p + sz >= end)
  645         goto too_big;
  646     memcpy(p, b_head(&ctx->buffer), sz);
  647     p += sz;
  648 
  649   end:
  650     return (p - frame);
  651 
  652   too_big:
  653     SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
  654     return 0;
  655 }
  656 
  657 /* Decode and process the HELLO frame sent by an agent. It returns the number of
  658  * read bytes on success, 0 if a decoding error occurred, and -1 if a fatal
  659  * error occurred. */
  660 static int
  661 spoe_handle_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
  662 {
  663     struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
  664     char              *p, *end;
  665     int                vsn, max_frame_size;
  666     unsigned int       flags;
  667 
  668     p   = frame;
  669     end = frame + size;
  670 
  671     /* Check frame type */
  672     if (*p++ != SPOE_FRM_T_AGENT_HELLO) {
  673         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  674         return 0;
  675     }
  676 
  677     if (size < 7 /* TYPE + METADATA */) {
  678         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  679         return 0;
  680     }
  681 
  682     /* Retrieve flags */
  683     memcpy((char *)&flags, p, 4);
  684     flags = ntohl(flags);
  685     p += 4;
  686 
  687     /* Fragmentation is not supported for HELLO frame */
  688     if (!(flags & SPOE_FRM_FL_FIN)) {
  689         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
  690         return -1;
  691     }
  692 
  693     /* stream-id and frame-id must be cleared */
  694     if (*p != 0 || *(p+1) != 0) {
  695         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  696         return 0;
  697     }
  698     p += 2;
  699 
  700     /* There are 3 mandatory items: "version", "max-frame-size" and
  701      * "capabilities" */
  702 
  703     /* Loop on K/V items */
  704     vsn = max_frame_size = flags = 0;
  705     while (p < end) {
  706         char  *str;
  707         uint64_t sz;
  708         int    ret;
  709 
  710         /* Decode the item key */
  711         ret = spoe_decode_buffer(&p, end, &str, &sz);
  712         if (ret == -1 || !sz) {
  713             SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  714             return 0;
  715         }
  716 
  717         /* Check "version" K/V item */
  718         if (!memcmp(str, VERSION_KEY, sz)) {
  719             int i, type = *p++;
  720 
  721             /* The value must be a string */
  722             if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
  723                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  724                 return 0;
  725             }
  726             if (spoe_decode_buffer(&p, end, &str, &sz) == -1) {
  727                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  728                 return 0;
  729             }
  730 
  731             vsn = spoe_str_to_vsn(str, sz);
  732             if (vsn == -1) {
  733                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_BAD_VSN;
  734                 return -1;
  735             }
  736             for (i = 0; supported_versions[i].str != NULL; ++i) {
  737                 if (vsn >= supported_versions[i].min &&
  738                     vsn <= supported_versions[i].max)
  739                     break;
  740             }
  741             if (supported_versions[i].str == NULL) {
  742                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_BAD_VSN;
  743                 return -1;
  744             }
  745         }
  746         /* Check "max-frame-size" K/V item */
  747         else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
  748             int type = *p++;
  749 
  750             /* The value must be integer */
  751             if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
  752                 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
  753                 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
  754                 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
  755                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  756                 return 0;
  757             }
  758             if (decode_varint(&p, end, &sz) == -1) {
  759                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  760                 return 0;
  761             }
  762             if (sz < MIN_FRAME_SIZE ||
  763                 sz > SPOE_APPCTX(appctx)->max_frame_size) {
  764                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
  765                 return -1;
  766             }
  767             max_frame_size = sz;
  768         }
  769         /* Check "capabilities" K/V item */
  770         else if (!memcmp(str, CAPABILITIES_KEY, sz)) {
  771             int type = *p++;
  772 
  773             /* The value must be a string */
  774             if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
  775                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  776                 return 0;
  777             }
  778             if (spoe_decode_buffer(&p, end, &str, &sz) == -1) {
  779                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  780                 return 0;
  781             }
  782 
  783             while (sz) {
  784                 char *delim;
  785 
  786                 /* Skip leading spaces */
  787                 for (; isspace(*str) && sz; str++, sz--);
  788 
  789                 if (sz >= 10 && !strncmp(str, "pipelining", 10)) {
  790                     str += 10; sz -= 10;
  791                     if (!sz || isspace(*str) || *str == ',')
  792                         flags |= SPOE_APPCTX_FL_PIPELINING;
  793                 }
  794                 else if (sz >= 5 && !strncmp(str, "async", 5)) {
  795                     str += 5; sz -= 5;
  796                     if (!sz || isspace(*str) || *str == ',')
  797                         flags |= SPOE_APPCTX_FL_ASYNC;
  798                 }
  799                 else if (sz >= 13 && !strncmp(str, "fragmentation", 13)) {
  800                     str += 13; sz -= 13;
  801                     if (!sz || isspace(*str) || *str == ',')
  802                         flags |= SPOE_APPCTX_FL_FRAGMENTATION;
  803                 }
  804 
  805                 /* Get the next comma or break */
  806                 if (!sz || (delim = memchr(str, ',', sz)) == NULL)
  807                     break;
  808                 delim++;
  809                 sz -= (delim - str);
  810                 str = delim;
  811             }
  812         }
  813         else {
  814             /* Silently ignore unknown item */
  815             if (spoe_skip_data(&p, end) == -1) {
  816                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  817                 return 0;
  818             }
  819         }
  820     }
  821 
  822     /* Final checks */
  823     if (!vsn) {
  824         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NO_VSN;
  825         return -1;
  826     }
  827     if (!max_frame_size) {
  828         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
  829         return -1;
  830     }
  831     if (!agent)
  832         flags &= ~(SPOE_APPCTX_FL_PIPELINING|SPOE_APPCTX_FL_ASYNC);
  833     else {
  834         if ((flags & SPOE_APPCTX_FL_PIPELINING) && !(agent->flags & SPOE_FL_PIPELINING))
  835             flags &= ~SPOE_APPCTX_FL_PIPELINING;
  836         if ((flags & SPOE_APPCTX_FL_ASYNC) && !(agent->flags & SPOE_FL_ASYNC))
  837             flags &= ~SPOE_APPCTX_FL_ASYNC;
  838     }
  839 
  840     SPOE_APPCTX(appctx)->version        = (unsigned int)vsn;
  841     SPOE_APPCTX(appctx)->max_frame_size = (unsigned int)max_frame_size;
  842     SPOE_APPCTX(appctx)->flags         |= flags;
  843 
  844     return (p - frame);
  845 }
  846 
  847 /* Decode DISCONNECT frame sent by an agent. It returns the number of by read
  848  * bytes on success, 0 if the frame can be ignored and -1 if an error
  849  * occurred. */
  850 static int
  851 spoe_handle_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
  852 {
  853     char        *p, *end;
  854     unsigned int flags;
  855 
  856     p   = frame;
  857     end = frame + size;
  858 
  859     /* Check frame type */
  860     if (*p++ != SPOE_FRM_T_AGENT_DISCON) {
  861         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  862         return 0;
  863     }
  864 
  865     if (size < 7 /* TYPE + METADATA */) {
  866         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  867         return 0;
  868     }
  869 
  870     /* Retrieve flags */
  871     memcpy((char *)&flags, p, 4);
  872     flags = ntohl(flags);
  873     p += 4;
  874 
  875     /* Fragmentation is not supported for DISCONNECT frame */
  876     if (!(flags & SPOE_FRM_FL_FIN)) {
  877         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
  878         return -1;
  879     }
  880 
  881     /* stream-id and frame-id must be cleared */
  882     if (*p != 0 || *(p+1) != 0) {
  883         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  884         return 0;
  885     }
  886     p += 2;
  887 
  888     /* There are 2 mandatory items: "status-code" and "message" */
  889 
  890     /* Loop on K/V items */
  891     while (p < end) {
  892         char  *str;
  893         uint64_t sz;
  894         int    ret;
  895 
  896         /* Decode the item key */
  897         ret = spoe_decode_buffer(&p, end, &str, &sz);
  898         if (ret == -1 || !sz) {
  899             SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  900             return 0;
  901         }
  902 
  903         /* Check "status-code" K/V item */
  904         if (!memcmp(str, STATUS_CODE_KEY, sz)) {
  905             int type = *p++;
  906 
  907             /* The value must be an integer */
  908             if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
  909                 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
  910                 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
  911                 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
  912                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  913                 return 0;
  914             }
  915             if (decode_varint(&p, end, &sz) == -1) {
  916                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  917                 return 0;
  918             }
  919             SPOE_APPCTX(appctx)->status_code = sz;
  920         }
  921 
  922         /* Check "message" K/V item */
  923         else if (!memcmp(str, MSG_KEY, sz)) {
  924             int type = *p++;
  925 
  926             /* The value must be a string */
  927             if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
  928                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  929                 return 0;
  930             }
  931             ret = spoe_decode_buffer(&p, end, &str, &sz);
  932             if (ret == -1 || sz > 255) {
  933                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  934                 return 0;
  935             }
  936 #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
  937             SPOE_APPCTX(appctx)->reason = str;
  938             SPOE_APPCTX(appctx)->rlen   = sz;
  939 #endif
  940         }
  941         else {
  942             /* Silently ignore unknown item */
  943             if (spoe_skip_data(&p, end) == -1) {
  944                 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  945                 return 0;
  946             }
  947         }
  948     }
  949 
  950     return (p - frame);
  951 }
  952 
  953 
  954 /* Decode ACK frame sent by an agent. It returns the number of read bytes on
  955  * success, 0 if the frame can be ignored and -1 if an error occurred. */
  956 static int
  957 spoe_handle_agentack_frame(struct appctx *appctx, struct spoe_context **ctx,
  958                char *frame, size_t size)
  959 {
  960     struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
  961     char              *p, *end;
  962     uint64_t           stream_id, frame_id;
  963     int                len;
  964     unsigned int       flags;
  965 
  966     p    = frame;
  967     end  = frame + size;
  968     *ctx = NULL;
  969 
  970     /* Check frame type */
  971     if (*p++ != SPOE_FRM_T_AGENT_ACK) {
  972         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  973         return 0;
  974     }
  975 
  976     if (size < 7 /* TYPE + METADATA */) {
  977         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  978         return 0;
  979     }
  980 
  981     /* Retrieve flags */
  982     memcpy((char *)&flags, p, 4);
  983     flags = ntohl(flags);
  984     p += 4;
  985 
  986     /* Fragmentation is not supported for now */
  987     if (!(flags & SPOE_FRM_FL_FIN)) {
  988         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
  989         return -1;
  990     }
  991 
  992     /* Get the stream-id and the frame-id */
  993     if (decode_varint(&p, end, &stream_id) == -1) {
  994         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  995         return 0;
  996     }
  997     if (decode_varint(&p, end, &frame_id) == -1) {
  998         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
  999         return 0;
 1000     }
 1001 
 1002     /* Try to find the corresponding SPOE context */
 1003     if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
 1004         list_for_each_entry((*ctx), &agent->rt[tid].waiting_queue, list) {
 1005             if ((*ctx)->stream_id == (unsigned int)stream_id &&
 1006                 (*ctx)->frame_id  == (unsigned int)frame_id)
 1007                 goto found;
 1008         }
 1009     }
 1010     else {
 1011         list_for_each_entry((*ctx), &SPOE_APPCTX(appctx)->waiting_queue, list) {
 1012             if ((*ctx)->stream_id == (unsigned int)stream_id &&
 1013                  (*ctx)->frame_id == (unsigned int)frame_id)
 1014                 goto found;
 1015         }
 1016     }
 1017 
 1018     if (SPOE_APPCTX(appctx)->frag_ctx.ctx &&
 1019         SPOE_APPCTX(appctx)->frag_ctx.cursid == (unsigned int)stream_id &&
 1020         SPOE_APPCTX(appctx)->frag_ctx.curfid == (unsigned int)frame_id) {
 1021 
 1022         /* ABRT bit is set for an unfinished fragmented frame */
 1023         if (flags & SPOE_FRM_FL_ABRT) {
 1024             *ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
 1025             (*ctx)->state = SPOE_CTX_ST_ERROR;
 1026             (*ctx)->status_code = SPOE_CTX_ERR_FRAG_FRAME_ABRT;
 1027             /* Ignore the payload */
 1028             goto end;
 1029         }
 1030         /* TODO: Handle more flags for fragmented frames: RESUME, FINISH... */
 1031         /*       For now, we ignore the ack */
 1032         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_INVALID;
 1033         return 0;
 1034     }
 1035 
 1036     /* No Stream found, ignore the frame */
 1037     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
 1038             " - Ignore ACK frame"
 1039             " - stream-id=%u - frame-id=%u\n",
 1040             (int)now.tv_sec, (int)now.tv_usec, agent->id,
 1041             __FUNCTION__, appctx,
 1042             (unsigned int)stream_id, (unsigned int)frame_id);
 1043 
 1044     SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAMEID_NOTFOUND;
 1045     if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
 1046         return -1;
 1047     return 0;
 1048 
 1049   found:
 1050     if (!spoe_acquire_buffer(&SPOE_APPCTX(appctx)->buffer,
 1051                  &SPOE_APPCTX(appctx)->buffer_wait)) {
 1052         *ctx = NULL;
 1053         return 1; /* Retry later */
 1054     }
 1055 
 1056     /* Copy encoded actions */
 1057     len = (end - p);
 1058     memcpy(b_head(&SPOE_APPCTX(appctx)->buffer), p, len);
 1059     b_set_data(&SPOE_APPCTX(appctx)->buffer, len);
 1060     p += len;
 1061 
 1062     /* Transfer the buffer ownership to the SPOE context */
 1063     (*ctx)->buffer = SPOE_APPCTX(appctx)->buffer;
 1064     SPOE_APPCTX(appctx)->buffer = BUF_NULL;
 1065 
 1066     (*ctx)->state = SPOE_CTX_ST_DONE;
 1067 
 1068   end:
 1069     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
 1070             " - ACK frame received"
 1071             " - ctx=%p - stream-id=%u - frame-id=%u - flags=0x%08x\n",
 1072             (int)now.tv_sec, (int)now.tv_usec, agent->id,
 1073             __FUNCTION__, appctx, *ctx, (*ctx)->stream_id,
 1074             (*ctx)->frame_id, flags);
 1075     return (p - frame);
 1076 }
 1077 
 1078 /* This function is used in cfgparse.c and declared in proto/checks.h. It
 1079  * prepare the request to send to agents during a healthcheck. It returns 0 on
 1080  * success and -1 if an error occurred. */
 1081 int
 1082 spoe_prepare_healthcheck_request(char **req, int *len)
 1083 {
 1084     struct appctx      appctx;
 1085     struct spoe_appctx spoe_appctx;
 1086     char  *frame, *end, buf[MAX_FRAME_SIZE+4];
 1087     size_t sz;
 1088     int    ret;
 1089 
 1090     memset(&appctx, 0, sizeof(appctx));
 1091     memset(&spoe_appctx, 0, sizeof(spoe_appctx));
 1092     memset(buf, 0, sizeof(buf));
 1093 
 1094     appctx.ctx.spoe.ptr = &spoe_appctx;
 1095     SPOE_APPCTX(&appctx)->max_frame_size = MAX_FRAME_SIZE;
 1096 
 1097     frame = buf+4; /* Reserved the 4 first bytes for the frame size */
 1098     end   = frame + MAX_FRAME_SIZE;
 1099 
 1100     ret = spoe_prepare_hahello_frame(&appctx, frame, MAX_FRAME_SIZE);
 1101     if (ret <= 0)
 1102         return -1;
 1103     frame += ret;
 1104 
 1105     /* Add "healthcheck" K/V item */
 1106     sz = SLEN(HEALTHCHECK_KEY);
 1107     if (spoe_encode_buffer(HEALTHCHECK_KEY, sz, &frame, end) == -1)
 1108         return -1;
 1109     *frame++ = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
 1110 
 1111     *len = frame - buf;
 1112     sz   = htonl(*len - 4);
 1113     memcpy(buf, (char *)&sz, 4);
 1114 
 1115     if ((*req = malloc(*len)) == NULL)
 1116         return -1;
 1117     memcpy(*req, buf, *len);
 1118     return 0;
 1119 }
 1120 
 1121 /* This function is used in checks.c and declared in proto/checks.h. It decode
 1122  * the response received from an agent during a healthcheck. It returns 0 on
 1123  * success and -1 if an error occurred. */
 1124 int
 1125 spoe_handle_healthcheck_response(char *frame, size_t size, char *err, int errlen)
 1126 {
 1127     struct appctx      appctx;
 1128     struct spoe_appctx spoe_appctx;
 1129 
 1130     memset(&appctx, 0, sizeof(appctx));
 1131     memset(&spoe_appctx, 0, sizeof(spoe_appctx));
 1132 
 1133     appctx.ctx.spoe.ptr = &spoe_appctx;
 1134     SPOE_APPCTX(&appctx)->max_frame_size = MAX_FRAME_SIZE;
 1135 
 1136     if (*frame == SPOE_FRM_T_AGENT_DISCON) {
 1137         spoe_handle_agentdiscon_frame(&appctx, frame, size);
 1138         goto error;
 1139     }
 1140     if (spoe_handle_agenthello_frame(&appctx, frame, size) <= 0)
 1141         goto error;
 1142 
 1143     return 0;
 1144 
 1145   error:
 1146     if (SPOE_APPCTX(&appctx)->status_code >= SPOE_FRM_ERRS)
 1147         SPOE_APPCTX(&appctx)->status_code = SPOE_FRM_ERR_UNKNOWN;
 1148     strncpy(err, spoe_frm_err_reasons[SPOE_APPCTX(&appctx)->status_code], errlen);
 1149     return -1;
 1150 }
 1151 
 1152 /* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when
 1153  * the frame can be ignored, 1 to retry later, and the frame legnth on
 1154  * success. */
 1155 static int
 1156 spoe_send_frame(struct appctx *appctx, char *buf, size_t framesz)
 1157 {
 1158     struct stream_interface *si = appctx->owner;
 1159     int      ret;
 1160     uint32_t netint;
 1161 
 1162     /* 4 bytes are reserved at the beginning of <buf> to store the frame
 1163      * length. */
 1164     netint = htonl(framesz);
 1165     memcpy(buf, (char *)&netint, 4);
 1166     ret = ci_putblk(si_ic(si), buf, framesz+4);
 1167     if (ret <= 0) {
 1168         if ((ret == -3 && b_is_null(&si_ic(si)->buf)) || ret == -1) {
 1169             si_rx_room_blk(si);
 1170             return 1; /* retry */
 1171         }
 1172         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
 1173         return -1; /* error */
 1174     }
 1175     return framesz;
 1176 }
 1177 
 1178 /* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0
 1179  * when the frame can be ignored, 1 to retry later and the frame length on
 1180  * success. */
 1181 static int
 1182 spoe_recv_frame(struct appctx *appctx, char *buf, size_t framesz)
 1183 {
 1184     struct stream_interface *si = appctx->owner;
 1185     int      ret;
 1186     uint32_t netint;
 1187 
 1188     ret = co_getblk(si_oc(si), (char *)&netint, 4, 0);
 1189     if (ret > 0) {
 1190         framesz = ntohl(netint);
 1191         if (framesz > SPOE_APPCTX(appctx)->max_frame_size) {
 1192             SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
 1193             return -1;
 1194         }
 1195         ret = co_getblk(si_oc(si), buf, framesz, 4);
 1196     }
 1197     if (ret <= 0) {
 1198         if (ret == 0) {
 1199             return 1; /* retry */
 1200         }
 1201         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
 1202         return -1; /* error */
 1203     }
 1204     return framesz;
 1205 }
 1206 
 1207 /********************************************************************
 1208  * Functions that manage the SPOE applet
 1209  ********************************************************************/
 1210 static int
 1211 spoe_wakeup_appctx(struct appctx *appctx)
 1212 {
 1213     si_want_get(appctx->owner);
 1214     si_rx_endp_more(appctx->owner);
 1215     appctx_wakeup(appctx);
 1216     return 1;
 1217 }
 1218 
 1219 /* Callback function that catches applet timeouts. If a timeout occurred, we set
 1220  * <appctx->st1> flag and the SPOE applet is woken up. */
 1221 static struct task *
 1222 spoe_process_appctx(struct task * task, void *context, unsigned short state)
 1223 {
 1224     struct appctx *appctx = context;
 1225 
 1226     appctx->st1 = SPOE_APPCTX_ERR_NONE;
 1227     if (tick_is_expired(task->expire, now_ms)) {
 1228         task->expire = TICK_ETERNITY;
 1229         appctx->st1  = SPOE_APPCTX_ERR_TOUT;
 1230     }
 1231     spoe_wakeup_appctx(appctx);
 1232     return task;
 1233 }
 1234 
 1235 /* Callback function that releases a SPOE applet. This happens when the
 1236  * connection with the agent is closed. */
 1237 static void
 1238 spoe_release_appctx(struct appctx *appctx)
 1239 {
 1240     struct stream_interface *si          = appctx->owner;
 1241     struct spoe_appctx      *spoe_appctx = SPOE_APPCTX(appctx);
 1242     struct spoe_agent       *agent;
 1243     struct spoe_context     *ctx, *back;
 1244 
 1245     if (spoe_appctx == NULL)
 1246         return;
 1247 
 1248     appctx->ctx.spoe.ptr = NULL;
 1249     agent = spoe_appctx->agent;
 1250 
 1251     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
 1252             (int)now.tv_sec, (int)now.tv_usec, agent->id,
 1253             __FUNCTION__, appctx);
 1254 
 1255     /* Remove applet from the list of running applets */
 1256     _HA_ATOMIC_SUB(&agent->counters.applets, 1);
 1257     HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 1258     if (!LIST_ISEMPTY(&spoe_appctx->list)) {
 1259         LIST_DEL(&spoe_appctx->list);
 1260         LIST_INIT(&spoe_appctx->list);
 1261     }
 1262     HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 1263 
 1264     /* Shutdown the server connection, if needed */
 1265     if (appctx->st0 != SPOE_APPCTX_ST_END) {
 1266         if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
 1267             eb32_delete(&spoe_appctx->node);
 1268             _HA_ATOMIC_SUB(&agent->counters.idles, 1);
 1269         }
 1270 
 1271         appctx->st0 = SPOE_APPCTX_ST_END;
 1272         if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
 1273             spoe_appctx->status_code = SPOE_FRM_ERR_IO;
 1274 
 1275         si_shutw(si);
 1276         si_shutr(si);
 1277         si_ic(si)->flags |= CF_READ_NULL;
 1278     }
 1279 
 1280     /* Destroy the task attached to this applet */
 1281     task_destroy(spoe_appctx->task);
 1282 
 1283     /* Notify all waiting streams */
 1284     list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) {
 1285         LIST_DEL(&ctx->list);
 1286         LIST_INIT(&ctx->list);
 1287         _HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 1288         spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 1289         ctx->state = SPOE_CTX_ST_ERROR;
 1290         ctx->status_code = (spoe_appctx->status_code + 0x100);
 1291         task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 1292     }
 1293 
 1294     /* If the applet was processing a fragmented frame, notify the
 1295      * corresponding stream. */
 1296     if (spoe_appctx->frag_ctx.ctx) {
 1297         ctx = spoe_appctx->frag_ctx.ctx;
 1298         ctx->spoe_appctx = NULL;
 1299         ctx->state = SPOE_CTX_ST_ERROR;
 1300         ctx->status_code = (spoe_appctx->status_code + 0x100);
 1301         task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 1302     }
 1303 
 1304     if (!LIST_ISEMPTY(&agent->rt[tid].applets))
 1305         goto end;
 1306 
 1307     /* If this was the last running applet, notify all waiting streams */
 1308     list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
 1309         LIST_DEL(&ctx->list);
 1310         LIST_INIT(&ctx->list);
 1311         _HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 1312         spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 1313         ctx->state = SPOE_CTX_ST_ERROR;
 1314         ctx->status_code = (spoe_appctx->status_code + 0x100);
 1315         task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 1316     }
 1317     list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
 1318         LIST_DEL(&ctx->list);
 1319         LIST_INIT(&ctx->list);
 1320         _HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 1321         spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 1322         ctx->state = SPOE_CTX_ST_ERROR;
 1323         ctx->status_code = (spoe_appctx->status_code + 0x100);
 1324         task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 1325     }
 1326 
 1327   end:
 1328     /* Release allocated memory */
 1329     spoe_release_buffer(&spoe_appctx->buffer,
 1330                 &spoe_appctx->buffer_wait);
 1331     pool_free(pool_head_spoe_appctx, spoe_appctx);
 1332 
 1333     /* Update runtinme agent info */
 1334     agent->rt[tid].frame_size = agent->max_frame_size;
 1335     list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list)
 1336         HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, spoe_appctx->max_frame_size);
 1337 }
 1338 
 1339 static int
 1340 spoe_handle_connect_appctx(struct appctx *appctx)
 1341 {
 1342     struct stream_interface *si    = appctx->owner;
 1343     struct spoe_agent       *agent = SPOE_APPCTX(appctx)->agent;
 1344     char *frame, *buf;
 1345     int   ret;
 1346 
 1347     if (si_state_in(si->state, SI_SB_CER|SI_SB_DIS|SI_SB_CLO)) {
 1348         /* closed */
 1349         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
 1350         goto exit;
 1351     }
 1352 
 1353     if (!si_state_in(si->state, SI_SB_RDY|SI_SB_EST)) {
 1354         /* not connected yet */
 1355         si_rx_endp_more(si);
 1356         task_wakeup(si_strm(si)->task, TASK_WOKEN_MSG);
 1357         goto stop;
 1358     }
 1359 
 1360     if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
 1361         SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
 1362                 " - Connection timed out\n",
 1363                 (int)now.tv_sec, (int)now.tv_usec, agent->id,
 1364                 __FUNCTION__, appctx);
 1365         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT;
 1366         goto exit;
 1367     }
 1368 
 1369     if (SPOE_APPCTX(appctx)->task->expire == TICK_ETERNITY)
 1370         SPOE_APPCTX(appctx)->task->expire =
 1371             tick_add_ifset(now_ms, agent->timeout.hello);
 1372 
 1373     /* 4 bytes are reserved at the beginning of <buf> to store the frame
 1374      * length. */
 1375     buf = trash.area; frame = buf+4;
 1376     ret = spoe_prepare_hahello_frame(appctx, frame,
 1377                      SPOE_APPCTX(appctx)->max_frame_size);
 1378     if (ret > 1)
 1379         ret = spoe_send_frame(appctx, buf, ret);
 1380 
 1381     switch (ret) {
 1382         case -1: /* error */
 1383         case  0: /* ignore => an error, cannot be ignored */
 1384             goto exit;
 1385 
 1386         case  1: /* retry later */
 1387             goto stop;
 1388 
 1389         default:
 1390             /* HELLO frame successfully sent, now wait for the
 1391              * reply. */
 1392             appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
 1393             goto next;
 1394     }
 1395 
 1396   next:
 1397     return 0;
 1398   stop:
 1399     return 1;
 1400   exit:
 1401     appctx->st0 = SPOE_APPCTX_ST_EXIT;
 1402     return 0;
 1403 }
 1404 
 1405 static int
 1406 spoe_handle_connecting_appctx(struct appctx *appctx)
 1407 {
 1408     struct stream_interface *si     = appctx->owner;
 1409     struct spoe_agent       *agent  = SPOE_APPCTX(appctx)->agent;
 1410     char  *frame;
 1411     int    ret;
 1412 
 1413 
 1414     if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
 1415         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
 1416         goto exit;
 1417     }
 1418 
 1419     if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
 1420         SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
 1421                 " - Connection timed out\n",
 1422                 (int)now.tv_sec, (int)now.tv_usec, agent->id,
 1423                 __FUNCTION__, appctx);
 1424         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT;
 1425         goto exit;
 1426     }
 1427 
 1428     frame = trash.area; trash.data = 0;
 1429     ret = spoe_recv_frame(appctx, frame,
 1430                   SPOE_APPCTX(appctx)->max_frame_size);
 1431     if (ret > 1) {
 1432         if (*frame == SPOE_FRM_T_AGENT_DISCON) {
 1433             appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
 1434             goto next;
 1435         }
 1436         trash.data = ret + 4;
 1437         ret = spoe_handle_agenthello_frame(appctx, frame, ret);
 1438     }
 1439 
 1440     switch (ret) {
 1441         case -1: /* error */
 1442         case  0: /* ignore => an error, cannot be ignored */
 1443             appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
 1444             goto next;
 1445 
 1446         case 1: /* retry later */
 1447             goto stop;
 1448 
 1449         default:
 1450             /* HELLO handshake is finished, set the idle timeout and
 1451              * add the applet in the list of running applets. */
 1452             _HA_ATOMIC_ADD(&agent->counters.idles, 1);
 1453             appctx->st0 = SPOE_APPCTX_ST_IDLE;
 1454             SPOE_APPCTX(appctx)->node.key = 0;
 1455             eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
 1456 
 1457             /* Update runtinme agent info */
 1458             HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
 1459             goto next;
 1460     }
 1461 
 1462   next:
 1463     /* Do not forget to remove processed frame from the output buffer */
 1464     if (trash.data)
 1465         co_skip(si_oc(si), trash.data);
 1466 
 1467     SPOE_APPCTX(appctx)->task->expire =
 1468         tick_add_ifset(now_ms, agent->timeout.idle);
 1469     return 0;
 1470   stop:
 1471     return 1;
 1472   exit:
 1473     appctx->st0 = SPOE_APPCTX_ST_EXIT;
 1474     return 0;
 1475 }
 1476 
 1477 
 1478 static int
 1479 spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
 1480 {
 1481     struct spoe_agent   *agent = SPOE_APPCTX(appctx)->agent;
 1482     struct spoe_context *ctx = NULL;
 1483     char *frame, *buf;
 1484     int   ret;
 1485 
 1486     /* 4 bytes are reserved at the beginning of <buf> to store the frame
 1487      * length. */
 1488     buf = trash.area; frame = buf+4;
 1489 
 1490     if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
 1491         ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
 1492         ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
 1493                         SPOE_APPCTX(appctx)->max_frame_size);
 1494     }
 1495     else if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) {
 1496         *skip = 1;
 1497         ret   = 1;
 1498         goto end;
 1499     }
 1500     else {
 1501         ctx = LIST_NEXT(&agent->rt[tid].sending_queue, typeof(ctx), list);
 1502         ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
 1503                           SPOE_APPCTX(appctx)->max_frame_size);
 1504 
 1505     }
 1506 
 1507     if (ret > 1)
 1508         ret = spoe_send_frame(appctx, buf, ret);
 1509 
 1510     switch (ret) {
 1511         case -1: /* error */
 1512             appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
 1513             goto end;
 1514 
 1515         case 0: /* ignore */
 1516             if (ctx == NULL)
 1517                 goto abort_frag_frame;
 1518 
 1519             spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 1520             LIST_DEL(&ctx->list);
 1521             LIST_INIT(&ctx->list);
 1522             _HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 1523             spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 1524             ctx->spoe_appctx = NULL;
 1525             ctx->state = SPOE_CTX_ST_ERROR;
 1526             ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
 1527             task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 1528             *skip = 1;
 1529             break;
 1530 
 1531         case 1: /* retry */
 1532             *skip = 1;
 1533             break;
 1534 
 1535         default:
 1536             if (ctx == NULL)
 1537                 goto abort_frag_frame;
 1538 
 1539             spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 1540             LIST_DEL(&ctx->list);
 1541             LIST_INIT(&ctx->list);
 1542             _HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 1543             spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 1544             ctx->spoe_appctx = SPOE_APPCTX(appctx);
 1545             if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
 1546                 (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN))
 1547                 goto no_frag_frame_sent;
 1548             else
 1549                 goto frag_frame_sent;
 1550     }
 1551     goto end;
 1552 
 1553   frag_frame_sent:
 1554     appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY;
 1555     *skip = 1;
 1556     SPOE_APPCTX(appctx)->frag_ctx.ctx    = ctx;
 1557     SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
 1558     SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
 1559     ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
 1560     task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 1561     goto end;
 1562 
 1563   no_frag_frame_sent:
 1564     if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
 1565         appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
 1566         LIST_ADDQ(&agent->rt[tid].waiting_queue, &ctx->list);
 1567     }
 1568     else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
 1569         appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
 1570         LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
 1571     }
 1572     else {
 1573         appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
 1574         *skip = 1;
 1575         LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
 1576     }
 1577     _HA_ATOMIC_ADD(&agent->counters.nb_waiting, 1);
 1578     ctx->stats.tv_wait = now;
 1579     SPOE_APPCTX(appctx)->frag_ctx.ctx    = NULL;
 1580     SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
 1581     SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
 1582     SPOE_APPCTX(appctx)->cur_fpa++;
 1583 
 1584     ctx->state = SPOE_CTX_ST_WAITING_ACK;
 1585     goto end;
 1586 
 1587   abort_frag_frame:
 1588     appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
 1589     SPOE_APPCTX(appctx)->frag_ctx.ctx    = NULL;
 1590     SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
 1591     SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
 1592     goto end;
 1593 
 1594   end:
 1595     return ret;
 1596 }
 1597 
 1598 static int
 1599 spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
 1600 {
 1601     struct spoe_agent   *agent = SPOE_APPCTX(appctx)->agent;
 1602     struct spoe_context *ctx = NULL;
 1603     char *frame;
 1604     int   ret;
 1605 
 1606     frame = trash.area; trash.data = 0;
 1607     ret = spoe_recv_frame(appctx, frame,
 1608                   SPOE_APPCTX(appctx)->max_frame_size);
 1609     if (ret > 1) {
 1610         if (*frame == SPOE_FRM_T_AGENT_DISCON) {
 1611             appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
 1612             ret = -1;
 1613             goto end;
 1614         }
 1615         trash.data = ret + 4;
 1616         ret = spoe_handle_agentack_frame(appctx, &ctx, frame, ret);
 1617     }
 1618     switch (ret) {
 1619         case -1: /* error */
 1620             appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
 1621             break;
 1622 
 1623         case 0: /* ignore */
 1624             break;
 1625 
 1626         case 1: /* retry */
 1627             *skip = 1;
 1628             break;
 1629 
 1630         default:
 1631             LIST_DEL(&ctx->list);
 1632             LIST_INIT(&ctx->list);
 1633             _HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 1634             spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 1635             ctx->stats.tv_response = now;
 1636             if (ctx->spoe_appctx) {
 1637                 ctx->spoe_appctx->cur_fpa--;
 1638                 ctx->spoe_appctx = NULL;
 1639             }
 1640             if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY &&
 1641                 ctx == SPOE_APPCTX(appctx)->frag_ctx.ctx) {
 1642                 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
 1643                 SPOE_APPCTX(appctx)->frag_ctx.ctx    = NULL;
 1644                 SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
 1645                 SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
 1646             }
 1647             else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
 1648                 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
 1649             task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 1650             break;
 1651     }
 1652 
 1653     /* Do not forget to remove processed frame from the output buffer */
 1654     if (trash.data)
 1655         co_skip(si_oc(appctx->owner), trash.data);
 1656   end:
 1657     return ret;
 1658 }
 1659 
 1660 static int
 1661 spoe_handle_processing_appctx(struct appctx *appctx)
 1662 {
 1663     struct stream_interface *si    = appctx->owner;
 1664     struct spoe_agent       *agent = SPOE_APPCTX(appctx)->agent;
 1665     int ret, skip_sending = 0, skip_receiving = 0, active_s = 0, active_r = 0;
 1666 
 1667     if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
 1668         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
 1669         goto exit;
 1670     }
 1671 
 1672     if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
 1673         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT;
 1674         appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
 1675         appctx->st1 = SPOE_APPCTX_ERR_NONE;
 1676         goto next;
 1677     }
 1678 
 1679     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
 1680             " - process: fpa=%u/%u - appctx-state=%s - weight=%u - flags=0x%08x\n",
 1681             (int)now.tv_sec, (int)now.tv_usec, agent->id,
 1682             __FUNCTION__, appctx, SPOE_APPCTX(appctx)->cur_fpa,
 1683             agent->max_fpa, spoe_appctx_state_str[appctx->st0],
 1684             SPOE_APPCTX(appctx)->node.key, SPOE_APPCTX(appctx)->flags);
 1685 
 1686     if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
 1687         skip_sending = 1;
 1688 
 1689     /* receiving_frame loop */
 1690     while (!skip_receiving) {
 1691         ret = spoe_handle_receiving_frame_appctx(appctx, &skip_receiving);
 1692         switch (ret) {
 1693             case -1: /* error */
 1694                 goto next;
 1695 
 1696             case 0: /* ignore */
 1697                 active_r = 1;
 1698                 break;
 1699 
 1700             case 1: /* retry */
 1701                 break;
 1702 
 1703             default:
 1704                 active_r = 1;
 1705                 break;
 1706         }
 1707     }
 1708 
 1709     /* send_frame loop */
 1710     while (!skip_sending && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
 1711         ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
 1712         switch (ret) {
 1713             case -1: /* error */
 1714                 goto next;
 1715 
 1716             case 0: /* ignore */
 1717                 if (SPOE_APPCTX(appctx)->node.key)
 1718                     SPOE_APPCTX(appctx)->node.key--;
 1719                 active_s++;
 1720                 break;
 1721 
 1722             case 1: /* retry */
 1723                 break;
 1724 
 1725             default:
 1726                 if (SPOE_APPCTX(appctx)->node.key)
 1727                     SPOE_APPCTX(appctx)->node.key--;
 1728                 active_s++;
 1729                 break;
 1730         }
 1731     }
 1732 
 1733     if (active_s || active_r) {
 1734         update_freq_ctr(&agent->rt[tid].processing_per_sec, active_s);
 1735         SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
 1736     }
 1737 
 1738     if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
 1739         _HA_ATOMIC_ADD(&agent->counters.idles, 1);
 1740         appctx->st0 = SPOE_APPCTX_ST_IDLE;
 1741         eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
 1742     }
 1743     return 1;
 1744 
 1745   next:
 1746     SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
 1747     return 0;
 1748 
 1749   exit:
 1750     appctx->st0 = SPOE_APPCTX_ST_EXIT;
 1751     return 0;
 1752 }
 1753 
 1754 static int
 1755 spoe_handle_disconnect_appctx(struct appctx *appctx)
 1756 {
 1757     struct stream_interface *si    = appctx->owner;
 1758     struct spoe_agent       *agent = SPOE_APPCTX(appctx)->agent;
 1759     char *frame, *buf;
 1760     int   ret;
 1761 
 1762     if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
 1763         goto exit;
 1764 
 1765     if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
 1766         goto exit;
 1767 
 1768     /* 4 bytes are reserved at the beginning of <buf> to store the frame
 1769      * length. */
 1770     buf = trash.area; frame = buf+4;
 1771     ret = spoe_prepare_hadiscon_frame(appctx, frame,
 1772                       SPOE_APPCTX(appctx)->max_frame_size);
 1773     if (ret > 1)
 1774         ret = spoe_send_frame(appctx, buf, ret);
 1775 
 1776     switch (ret) {
 1777         case -1: /* error */
 1778         case  0: /* ignore  => an error, cannot be ignored */
 1779             goto exit;
 1780 
 1781         case 1: /* retry */
 1782             goto stop;
 1783 
 1784         default:
 1785             SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
 1786                     " - disconnected by HAProxy (%d): %s\n",
 1787                     (int)now.tv_sec, (int)now.tv_usec, agent->id,
 1788                     __FUNCTION__, appctx,
 1789                     SPOE_APPCTX(appctx)->status_code,
 1790                     spoe_frm_err_reasons[SPOE_APPCTX(appctx)->status_code]);
 1791 
 1792             appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
 1793             goto next;
 1794     }
 1795 
 1796   next:
 1797     SPOE_APPCTX(appctx)->task->expire =
 1798         tick_add_ifset(now_ms, agent->timeout.idle);
 1799     return 0;
 1800   stop:
 1801     return 1;
 1802   exit:
 1803     appctx->st0 = SPOE_APPCTX_ST_EXIT;
 1804     return 0;
 1805 }
 1806 
 1807 static int
 1808 spoe_handle_disconnecting_appctx(struct appctx *appctx)
 1809 {
 1810     struct stream_interface *si = appctx->owner;
 1811     char  *frame;
 1812     int    ret;
 1813 
 1814     if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
 1815         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
 1816         goto exit;
 1817     }
 1818 
 1819     if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
 1820         SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOUT;
 1821         goto exit;
 1822     }
 1823 
 1824     frame = trash.area; trash.data = 0;
 1825     ret = spoe_recv_frame(appctx, frame,
 1826                   SPOE_APPCTX(appctx)->max_frame_size);
 1827     if (ret > 1) {
 1828         trash.data = ret + 4;
 1829         ret = spoe_handle_agentdiscon_frame(appctx, frame, ret);
 1830     }
 1831 
 1832     switch (ret) {
 1833         case -1: /* error  */
 1834             SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
 1835                     " - error on frame (%s)\n",
 1836                     (int)now.tv_sec, (int)now.tv_usec,
 1837                     ((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
 1838                     __FUNCTION__, appctx,
 1839                     spoe_frm_err_reasons[SPOE_APPCTX(appctx)->status_code]);
 1840             goto exit;
 1841 
 1842         case  0: /* ignore */
 1843             goto next;
 1844 
 1845         case  1: /* retry */
 1846             goto stop;
 1847 
 1848         default:
 1849             SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
 1850                     " - disconnected by peer (%d): %.*s\n",
 1851                     (int)now.tv_sec, (int)now.tv_usec,
 1852                     ((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
 1853                     __FUNCTION__, appctx, SPOE_APPCTX(appctx)->status_code,
 1854                     SPOE_APPCTX(appctx)->rlen, SPOE_APPCTX(appctx)->reason);
 1855             goto exit;
 1856     }
 1857 
 1858   next:
 1859     /* Do not forget to remove processed frame from the output buffer */
 1860     if (trash.data)
 1861         co_skip(si_oc(appctx->owner), trash.data);
 1862 
 1863     return 0;
 1864   stop:
 1865     return 1;
 1866   exit:
 1867     appctx->st0 = SPOE_APPCTX_ST_EXIT;
 1868     return 0;
 1869 }
 1870 
 1871 /* I/O Handler processing messages exchanged with the agent */
 1872 static void
 1873 spoe_handle_appctx(struct appctx *appctx)
 1874 {
 1875     struct stream_interface *si = appctx->owner;
 1876     struct spoe_agent       *agent;
 1877 
 1878     if (SPOE_APPCTX(appctx) == NULL)
 1879         return;
 1880 
 1881     SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
 1882     agent = SPOE_APPCTX(appctx)->agent;
 1883 
 1884   switchstate:
 1885     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
 1886             " - appctx-state=%s\n",
 1887             (int)now.tv_sec, (int)now.tv_usec, agent->id,
 1888             __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
 1889 
 1890     switch (appctx->st0) {
 1891         case SPOE_APPCTX_ST_CONNECT:
 1892             if (spoe_handle_connect_appctx(appctx))
 1893                 goto out;
 1894             goto switchstate;
 1895 
 1896         case SPOE_APPCTX_ST_CONNECTING:
 1897             if (spoe_handle_connecting_appctx(appctx))
 1898                 goto out;
 1899             goto switchstate;
 1900 
 1901         case SPOE_APPCTX_ST_IDLE:
 1902             _HA_ATOMIC_SUB(&agent->counters.idles, 1);
 1903             eb32_delete(&SPOE_APPCTX(appctx)->node);
 1904             if (stopping &&
 1905                 LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
 1906                 LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
 1907                 SPOE_APPCTX(appctx)->task->expire =
 1908                     tick_add_ifset(now_ms, agent->timeout.idle);
 1909                 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
 1910                 goto switchstate;
 1911             }
 1912             appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
 1913             /* fall through */
 1914 
 1915         case SPOE_APPCTX_ST_PROCESSING:
 1916         case SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY:
 1917         case SPOE_APPCTX_ST_WAITING_SYNC_ACK:
 1918             if (spoe_handle_processing_appctx(appctx))
 1919                 goto out;
 1920             goto switchstate;
 1921 
 1922         case SPOE_APPCTX_ST_DISCONNECT:
 1923             if (spoe_handle_disconnect_appctx(appctx))
 1924                 goto out;
 1925             goto switchstate;
 1926 
 1927         case SPOE_APPCTX_ST_DISCONNECTING:
 1928             if (spoe_handle_disconnecting_appctx(appctx))
 1929                 goto out;
 1930             goto switchstate;
 1931 
 1932         case SPOE_APPCTX_ST_EXIT:
 1933             appctx->st0 = SPOE_APPCTX_ST_END;
 1934             SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
 1935 
 1936             si_shutw(si);
 1937             si_shutr(si);
 1938             si_ic(si)->flags |= CF_READ_NULL;
 1939             /* fall through */
 1940 
 1941         case SPOE_APPCTX_ST_END:
 1942             return;
 1943     }
 1944   out:
 1945     if (stopping)
 1946         spoe_wakeup_appctx(appctx);
 1947 
 1948     if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY)
 1949         task_queue(SPOE_APPCTX(appctx)->task);
 1950 }
 1951 
 1952 struct applet spoe_applet = {
 1953     .obj_type = OBJ_TYPE_APPLET,
 1954     .name = "<SPOE>", /* used for logging */
 1955     .fct = spoe_handle_appctx,
 1956     .release = spoe_release_appctx,
 1957 };
 1958 
 1959 /* Create a SPOE applet. On success, the created applet is returned, else
 1960  * NULL. */
 1961 static struct appctx *
 1962 spoe_create_appctx(struct spoe_config *conf)
 1963 {
 1964     struct appctx      *appctx;
 1965     struct session     *sess;
 1966     struct stream      *strm;
 1967 
 1968     if ((appctx = appctx_new(&spoe_applet, tid_bit)) == NULL)
 1969         goto out_error;
 1970 
 1971     appctx->ctx.spoe.ptr = pool_alloc_dirty(pool_head_spoe_appctx);
 1972     if (SPOE_APPCTX(appctx) == NULL)
 1973         goto out_free_appctx;
 1974     memset(appctx->ctx.spoe.ptr, 0, pool_head_spoe_appctx->size);
 1975 
 1976     appctx->st0 = SPOE_APPCTX_ST_CONNECT;
 1977     if ((SPOE_APPCTX(appctx)->task = task_new(tid_bit)) == NULL)
 1978         goto out_free_spoe_appctx;
 1979 
 1980     SPOE_APPCTX(appctx)->owner           = appctx;
 1981     SPOE_APPCTX(appctx)->task->process   = spoe_process_appctx;
 1982     SPOE_APPCTX(appctx)->task->context   = appctx;
 1983     SPOE_APPCTX(appctx)->agent           = conf->agent;
 1984     SPOE_APPCTX(appctx)->version         = 0;
 1985     SPOE_APPCTX(appctx)->max_frame_size  = conf->agent->max_frame_size;
 1986     SPOE_APPCTX(appctx)->flags           = 0;
 1987     SPOE_APPCTX(appctx)->status_code     = SPOE_FRM_ERR_NONE;
 1988     SPOE_APPCTX(appctx)->buffer          = BUF_NULL;
 1989     SPOE_APPCTX(appctx)->cur_fpa         = 0;
 1990 
 1991     LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
 1992     SPOE_APPCTX(appctx)->buffer_wait.target = appctx;
 1993     SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_appctx;
 1994 
 1995     LIST_INIT(&SPOE_APPCTX(appctx)->list);
 1996     LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue);
 1997 
 1998     sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
 1999     if (!sess)
 2000         goto out_free_spoe;
 2001 
 2002     if ((strm = stream_new(sess, &appctx->obj_type)) == NULL)
 2003         goto out_free_sess;
 2004 
 2005     stream_set_backend(strm, conf->agent->b.be);
 2006 
 2007     /* applet is waiting for data */
 2008     si_cant_get(&strm->si[0]);
 2009     appctx_wakeup(appctx);
 2010 
 2011     strm->do_log = NULL;
 2012     strm->res.flags |= CF_READ_DONTWAIT;
 2013 
 2014     HA_SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
 2015     LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
 2016     HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
 2017     _HA_ATOMIC_ADD(&conf->agent->counters.applets, 1);
 2018 
 2019     task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
 2020     task_wakeup(strm->task, TASK_WOKEN_INIT);
 2021     return appctx;
 2022 
 2023     /* Error unrolling */
 2024  out_free_sess:
 2025     session_free(sess);
 2026  out_free_spoe:
 2027     task_destroy(SPOE_APPCTX(appctx)->task);
 2028  out_free_spoe_appctx:
 2029     pool_free(pool_head_spoe_appctx, SPOE_APPCTX(appctx));
 2030  out_free_appctx:
 2031     appctx_free(appctx);
 2032  out_error:
 2033     return NULL;
 2034 }
 2035 
 2036 static int
 2037 spoe_queue_context(struct spoe_context *ctx)
 2038 {
 2039     struct spoe_config *conf = FLT_CONF(ctx->filter);
 2040     struct spoe_agent  *agent = conf->agent;
 2041     struct appctx      *appctx;
 2042     struct spoe_appctx *spoe_appctx;
 2043 
 2044     /* Check if we need to create a new SPOE applet or not. */
 2045     if (!eb_is_empty(&agent->rt[tid].idle_applets) &&
 2046         agent->rt[tid].processing < read_freq_ctr(&agent->rt[tid].processing_per_sec))
 2047         goto end;
 2048 
 2049     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2050             " - try to create new SPOE appctx\n",
 2051             (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
 2052             ctx->strm);
 2053 
 2054     /* Do not try to create a new applet if there is no server up for the
 2055      * agent's backend. */
 2056     if (!agent->b.be->srv_act && !agent->b.be->srv_bck) {
 2057         SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2058                 " - cannot create SPOE appctx: no server up\n",
 2059                 (int)now.tv_sec, (int)now.tv_usec, agent->id,
 2060                 __FUNCTION__, ctx->strm);
 2061         goto end;
 2062     }
 2063 
 2064     /* Do not try to create a new applet if we have reached the maximum of
 2065      * connection per seconds */
 2066     if (agent->cps_max > 0) {
 2067         if (!freq_ctr_remain(&agent->rt[tid].conn_per_sec, agent->cps_max, 0)) {
 2068             SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2069                     " - cannot create SPOE appctx: max CPS reached\n",
 2070                     (int)now.tv_sec, (int)now.tv_usec, agent->id,
 2071                     __FUNCTION__, ctx->strm);
 2072             goto end;
 2073         }
 2074     }
 2075 
 2076     appctx = spoe_create_appctx(conf);
 2077     if (appctx == NULL) {
 2078         SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2079                 " - failed to create SPOE appctx\n",
 2080                 (int)now.tv_sec, (int)now.tv_usec, agent->id,
 2081                 __FUNCTION__, ctx->strm);
 2082         send_log(&conf->agent_fe, LOG_EMERG,
 2083              "SPOE: [%s] failed to create SPOE applet\n",
 2084              agent->id);
 2085 
 2086         goto end;
 2087     }
 2088 
 2089     /* Increase the per-process number of cumulated connections */
 2090     if (agent->cps_max > 0)
 2091         update_freq_ctr(&agent->rt[tid].conn_per_sec, 1);
 2092 
 2093   end:
 2094     /* The only reason to return an error is when there is no applet */
 2095     if (LIST_ISEMPTY(&agent->rt[tid].applets)) {
 2096         ctx->status_code = SPOE_CTX_ERR_RES;
 2097         return -1;
 2098     }
 2099 
 2100     /* Add the SPOE context in the sending queue if the stream has no applet
 2101      * already assigned and wakeup all idle applets. Otherwise, don't queue
 2102      * it. */
 2103     _HA_ATOMIC_ADD(&agent->counters.nb_sending, 1);
 2104     spoe_update_stat_time(&ctx->stats.tv_request, &ctx->stats.t_request);
 2105     ctx->stats.tv_queue = now;
 2106     if (ctx->spoe_appctx)
 2107         return 1;
 2108     LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
 2109 
 2110     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2111             " - Add stream in sending queue"
 2112             " - applets=%u - idles=%u - processing=%u\n",
 2113             (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
 2114             ctx->strm, agent->counters.applets, agent->counters.idles,
 2115             agent->rt[tid].processing);
 2116 
 2117     /* Finally try to wakeup an IDLE applet. */
 2118     if (!eb_is_empty(&agent->rt[tid].idle_applets)) {
 2119         struct eb32_node *node;
 2120 
 2121         node = eb32_first(&agent->rt[tid].idle_applets);
 2122         spoe_appctx = eb32_entry(node, struct spoe_appctx, node);
 2123         if (node && spoe_appctx) {
 2124             eb32_delete(&spoe_appctx->node);
 2125             spoe_appctx->node.key++;
 2126             eb32_insert(&agent->rt[tid].idle_applets, &spoe_appctx->node);
 2127             spoe_wakeup_appctx(spoe_appctx->owner);
 2128         }
 2129     }
 2130     return 1;
 2131 }
 2132 
 2133 /***************************************************************************
 2134  * Functions that encode SPOE messages
 2135  **************************************************************************/
 2136 /* Encode a SPOE message. Info in <ctx->frag_ctx>, if any, are used to handle
 2137  * fragmented_content. If the next message can be processed, it returns 0. If
 2138  * the message is too big, it returns -1.*/
 2139 static int
 2140 spoe_encode_message(struct stream *s, struct spoe_context *ctx,
 2141             struct spoe_message *msg, int dir,
 2142             char **buf, char *end)
 2143 {
 2144     struct sample   *smp;
 2145     struct spoe_arg *arg;
 2146     int ret;
 2147 
 2148     if (msg->cond) {
 2149         ret = acl_exec_cond(msg->cond, s->be, s->sess, s, dir|SMP_OPT_FINAL);
 2150         ret = acl_pass(ret);
 2151         if (msg->cond->pol == ACL_COND_UNLESS)
 2152             ret = !ret;
 2153 
 2154         /* the rule does not match */
 2155         if (!ret)
 2156             goto next;
 2157     }
 2158 
 2159         /* Resume encoding of a SPOE argument */
 2160     if (ctx->frag_ctx.curarg != NULL) {
 2161         arg = ctx->frag_ctx.curarg;
 2162         goto encode_argument;
 2163     }
 2164 
 2165     if (ctx->frag_ctx.curoff != UINT_MAX)
 2166         goto encode_msg_payload;
 2167 
 2168     /* Check if there is enough space for the message name and the
 2169      * number of arguments. It implies <msg->id_len> is encoded on 2
 2170      * bytes, at most (< 2288). */
 2171     if (*buf + 2 + msg->id_len + 1 > end)
 2172         goto too_big;
 2173 
 2174     /* Encode the message name */
 2175     if (spoe_encode_buffer(msg->id, msg->id_len, buf, end) == -1)
 2176         goto too_big;
 2177 
 2178     /* Set the number of arguments for this message */
 2179     **buf = msg->nargs;
 2180     (*buf)++;
 2181 
 2182     ctx->frag_ctx.curoff = 0;
 2183   encode_msg_payload:
 2184 
 2185     /* Loop on arguments */
 2186     list_for_each_entry(arg, &msg->args, list) {
 2187         ctx->frag_ctx.curarg = arg;
 2188         ctx->frag_ctx.curoff = UINT_MAX;
 2189         ctx->frag_ctx.curlen = 0;
 2190 
 2191       encode_argument:
 2192         if (ctx->frag_ctx.curoff != UINT_MAX)
 2193             goto encode_arg_value;
 2194 
 2195         /* Encode the argument name as a string. It can by NULL */
 2196         if (spoe_encode_buffer(arg->name, arg->name_len, buf, end) == -1)
 2197             goto too_big;
 2198 
 2199         ctx->frag_ctx.curoff = 0;
 2200       encode_arg_value:
 2201 
 2202         /* Fetch the argument value */
 2203         smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
 2204         if (smp) {
 2205             smp->ctx.a[0] = &ctx->frag_ctx.curlen;
 2206             smp->ctx.a[1] = &ctx->frag_ctx.curoff;
 2207         }
 2208         ret = spoe_encode_data(smp, buf, end);
 2209         if (ret == -1 || ctx->frag_ctx.curoff)
 2210             goto too_big;
 2211     }
 2212 
 2213   next:
 2214     return 0;
 2215 
 2216   too_big:
 2217     return -1;
 2218 }
 2219 
 2220 /* Encode list of SPOE messages. Info in <ctx->frag_ctx>, if any, are used to
 2221  * handle fragmented content. On success it returns 1. If an error occurred, -1
 2222  * is returned. If nothing has been encoded, it returns 0 (this is only possible
 2223  * for unfragmented payload). */
 2224 static int
 2225 spoe_encode_messages(struct stream *s, struct spoe_context *ctx,
 2226              struct list *messages, int dir, int type)
 2227 {
 2228     struct spoe_config  *conf = FLT_CONF(ctx->filter);
 2229     struct spoe_agent   *agent = conf->agent;
 2230     struct spoe_message *msg;
 2231     char   *p, *end;
 2232 
 2233     p   = b_head(&ctx->buffer);
 2234     end =  p + agent->rt[tid].frame_size - FRAME_HDR_SIZE;
 2235 
 2236     if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
 2237         /* Resume encoding of a SPOE message */
 2238         if (ctx->frag_ctx.curmsg != NULL) {
 2239             msg = ctx->frag_ctx.curmsg;
 2240             goto encode_evt_message;
 2241         }
 2242 
 2243         list_for_each_entry(msg, messages, by_evt) {
 2244             ctx->frag_ctx.curmsg = msg;
 2245             ctx->frag_ctx.curarg = NULL;
 2246             ctx->frag_ctx.curoff = UINT_MAX;
 2247 
 2248         encode_evt_message:
 2249             if (spoe_encode_message(s, ctx, msg, dir, &p, end) == -1)
 2250                 goto too_big;
 2251         }
 2252     }
 2253     else if (type == SPOE_MSGS_BY_GROUP) { /* Loop on messages by group */
 2254         /* Resume encoding of a SPOE message */
 2255         if (ctx->frag_ctx.curmsg != NULL) {
 2256             msg = ctx->frag_ctx.curmsg;
 2257             goto encode_grp_message;
 2258         }
 2259 
 2260         list_for_each_entry(msg, messages, by_grp) {
 2261             ctx->frag_ctx.curmsg = msg;
 2262             ctx->frag_ctx.curarg = NULL;
 2263             ctx->frag_ctx.curoff = UINT_MAX;
 2264 
 2265         encode_grp_message:
 2266             if (spoe_encode_message(s, ctx, msg, dir, &p, end) == -1)
 2267                 goto too_big;
 2268         }
 2269     }
 2270     else
 2271         goto skip;
 2272 
 2273 
 2274     /* nothing has been encoded for an unfragmented payload */
 2275     if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) && p == b_head(&ctx->buffer))
 2276         goto skip;
 2277 
 2278     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2279             " - encode %s messages - spoe_appctx=%p"
 2280             "- max_size=%u - encoded=%ld\n",
 2281             (int)now.tv_sec, (int)now.tv_usec,
 2282             agent->id, __FUNCTION__, s,
 2283             ((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"),
 2284             ctx->spoe_appctx, (agent->rt[tid].frame_size - FRAME_HDR_SIZE),
 2285             p - b_head(&ctx->buffer));
 2286 
 2287     b_set_data(&ctx->buffer, p - b_head(&ctx->buffer));
 2288     ctx->frag_ctx.curmsg = NULL;
 2289     ctx->frag_ctx.curarg = NULL;
 2290     ctx->frag_ctx.curoff = 0;
 2291     ctx->frag_ctx.flags  = SPOE_FRM_FL_FIN;
 2292 
 2293     return 1;
 2294 
 2295   too_big:
 2296     /* Return an error if fragmentation is unsupported or if nothing has
 2297      * been encoded because its too big and not splittable. */
 2298     if (!(agent->flags & SPOE_FL_SND_FRAGMENTATION) || p == b_head(&ctx->buffer)) {
 2299         ctx->status_code = SPOE_CTX_ERR_TOO_BIG;
 2300         return -1;
 2301     }
 2302 
 2303     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2304             " - encode fragmented messages - spoe_appctx=%p"
 2305             " - curmsg=%p - curarg=%p - curoff=%u"
 2306             " - max_size=%u - encoded=%ld\n",
 2307             (int)now.tv_sec, (int)now.tv_usec,
 2308             agent->id, __FUNCTION__, s, ctx->spoe_appctx,
 2309             ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff,
 2310             (agent->rt[tid].frame_size - FRAME_HDR_SIZE), p - b_head(&ctx->buffer));
 2311 
 2312     b_set_data(&ctx->buffer, p - b_head(&ctx->buffer));
 2313     ctx->flags |= SPOE_CTX_FL_FRAGMENTED;
 2314     ctx->frag_ctx.flags &= ~SPOE_FRM_FL_FIN;
 2315     return 1;
 2316 
 2317   skip:
 2318     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2319             " - skip the frame because nothing has been encoded\n",
 2320             (int)now.tv_sec, (int)now.tv_usec,
 2321             agent->id, __FUNCTION__, s);
 2322     return 0;
 2323 }
 2324 
 2325 
 2326 /***************************************************************************
 2327  * Functions that handle SPOE actions
 2328  **************************************************************************/
 2329 /* Helper function to set a variable */
 2330 static void
 2331 spoe_set_var(struct spoe_context *ctx, char *scope, char *name, int len,
 2332          struct sample *smp)
 2333 {
 2334     struct spoe_config *conf = FLT_CONF(ctx->filter);
 2335     struct spoe_agent  *agent = conf->agent;
 2336     char                varname[64];
 2337 
 2338     memset(varname, 0, sizeof(varname));
 2339     len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
 2340                scope, agent->var_pfx, len, name);
 2341     if (agent->flags & SPOE_FL_FORCE_SET_VAR)
 2342         vars_set_by_name(varname, len, smp);
 2343     else
 2344         vars_set_by_name_ifexist(varname, len, smp);
 2345 }
 2346 
 2347 /* Helper function to unset a variable */
 2348 static void
 2349 spoe_unset_var(struct spoe_context *ctx, char *scope, char *name, int len,
 2350            struct sample *smp)
 2351 {
 2352     struct spoe_config *conf = FLT_CONF(ctx->filter);
 2353     struct spoe_agent  *agent = conf->agent;
 2354     char                varname[64];
 2355 
 2356     memset(varname, 0, sizeof(varname));
 2357     len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
 2358                scope, agent->var_pfx, len, name);
 2359     vars_unset_by_name_ifexist(varname, len, smp);
 2360 }
 2361 
 2362 
 2363 static inline int
 2364 spoe_decode_action_set_var(struct stream *s, struct spoe_context *ctx,
 2365                char **buf, char *end, int dir)
 2366 {
 2367     char         *str, *scope, *p = *buf;
 2368     struct sample smp;
 2369     uint64_t      sz;
 2370     int           ret;
 2371 
 2372     if (p + 2 >= end)
 2373         goto skip;
 2374 
 2375     /* SET-VAR requires 3 arguments */
 2376     if (*p++ != 3)
 2377         goto skip;
 2378 
 2379     switch (*p++) {
 2380         case SPOE_SCOPE_PROC: scope = "proc"; break;
 2381         case SPOE_SCOPE_SESS: scope = "sess"; break;
 2382         case SPOE_SCOPE_TXN : scope = "txn";  break;
 2383         case SPOE_SCOPE_REQ : scope = "req";  break;
 2384         case SPOE_SCOPE_RES : scope = "res";  break;
 2385         default: goto skip;
 2386     }
 2387 
 2388     if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
 2389         goto skip;
 2390     memset(&smp, 0, sizeof(smp));
 2391     smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
 2392 
 2393     if (spoe_decode_data(&p, end, &smp) == -1)
 2394         goto skip;
 2395 
 2396     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2397             " - set-var '%s.%s.%.*s'\n",
 2398             (int)now.tv_sec, (int)now.tv_usec,
 2399             ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
 2400             __FUNCTION__, s, scope,
 2401             ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
 2402             (int)sz, str);
 2403 
 2404     spoe_set_var(ctx, scope, str, sz, &smp);
 2405 
 2406     ret  = (p - *buf);
 2407     *buf = p;
 2408     return ret;
 2409   skip:
 2410     return 0;
 2411 }
 2412 
 2413 static inline int
 2414 spoe_decode_action_unset_var(struct stream *s, struct spoe_context *ctx,
 2415                  char **buf, char *end, int dir)
 2416 {
 2417     char         *str, *scope, *p = *buf;
 2418     struct sample smp;
 2419     uint64_t      sz;
 2420     int           ret;
 2421 
 2422     if (p + 2 >= end)
 2423         goto skip;
 2424 
 2425     /* UNSET-VAR requires 2 arguments */
 2426     if (*p++ != 2)
 2427         goto skip;
 2428 
 2429     switch (*p++) {
 2430         case SPOE_SCOPE_PROC: scope = "proc"; break;
 2431         case SPOE_SCOPE_SESS: scope = "sess"; break;
 2432         case SPOE_SCOPE_TXN : scope = "txn";  break;
 2433         case SPOE_SCOPE_REQ : scope = "req";  break;
 2434         case SPOE_SCOPE_RES : scope = "res";  break;
 2435         default: goto skip;
 2436     }
 2437 
 2438     if (spoe_decode_buffer(&p, end, &str, &sz) == -1)
 2439         goto skip;
 2440     memset(&smp, 0, sizeof(smp));
 2441     smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
 2442 
 2443     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2444             " - unset-var '%s.%s.%.*s'\n",
 2445             (int)now.tv_sec, (int)now.tv_usec,
 2446             ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
 2447             __FUNCTION__, s, scope,
 2448             ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
 2449             (int)sz, str);
 2450 
 2451     spoe_unset_var(ctx, scope, str, sz, &smp);
 2452 
 2453     ret  = (p - *buf);
 2454     *buf = p;
 2455     return ret;
 2456   skip:
 2457     return 0;
 2458 }
 2459 
 2460 /* Process SPOE actions for a specific event. It returns 1 on success. If an
 2461  * error occurred, 0 is returned. */
 2462 static int
 2463 spoe_process_actions(struct stream *s, struct spoe_context *ctx, int dir)
 2464 {
 2465     char *p, *end;
 2466     int   ret;
 2467 
 2468     p   = b_head(&ctx->buffer);
 2469     end = p + b_data(&ctx->buffer);
 2470 
 2471     while (p < end)  {
 2472         enum spoe_action_type type;
 2473 
 2474         type = *p++;
 2475         switch (type) {
 2476             case SPOE_ACT_T_SET_VAR:
 2477                 ret = spoe_decode_action_set_var(s, ctx, &p, end, dir);
 2478                 if (!ret)
 2479                     goto skip;
 2480                 break;
 2481 
 2482             case SPOE_ACT_T_UNSET_VAR:
 2483                 ret = spoe_decode_action_unset_var(s, ctx, &p, end, dir);
 2484                 if (!ret)
 2485                     goto skip;
 2486                 break;
 2487 
 2488             default:
 2489                 goto skip;
 2490         }
 2491     }
 2492 
 2493     return 1;
 2494   skip:
 2495     return 0;
 2496 }
 2497 
 2498 /***************************************************************************
 2499  * Functions that process SPOE events
 2500  **************************************************************************/
 2501 static void
 2502 spoe_update_stats(struct stream *s, struct spoe_agent *agent,
 2503           struct spoe_context *ctx, int dir)
 2504 {
 2505     if (!tv_iszero(&ctx->stats.tv_start)) {
 2506         spoe_update_stat_time(&ctx->stats.tv_start, &ctx->stats.t_process);
 2507         ctx->stats.t_total  += ctx->stats.t_process;
 2508         tv_zero(&ctx->stats.tv_request);
 2509         tv_zero(&ctx->stats.tv_queue);
 2510         tv_zero(&ctx->stats.tv_wait);
 2511         tv_zero(&ctx->stats.tv_response);
 2512     }
 2513 
 2514     if (agent->var_t_process) {
 2515         struct sample smp;
 2516 
 2517         memset(&smp, 0, sizeof(smp));
 2518         smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
 2519         smp.data.u.sint = ctx->stats.t_process;
 2520         smp.data.type   = SMP_T_SINT;
 2521 
 2522         spoe_set_var(ctx, "txn", agent->var_t_process,
 2523                  strlen(agent->var_t_process), &smp);
 2524     }
 2525 
 2526     if (agent->var_t_total) {
 2527         struct sample smp;
 2528 
 2529         memset(&smp, 0, sizeof(smp));
 2530         smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
 2531         smp.data.u.sint = ctx->stats.t_total;
 2532         smp.data.type   = SMP_T_SINT;
 2533 
 2534         spoe_set_var(ctx, "txn", agent->var_t_total,
 2535                  strlen(agent->var_t_total), &smp);
 2536     }
 2537 }
 2538 
 2539 static void
 2540 spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
 2541                  struct spoe_context *ctx, int dir)
 2542 {
 2543     if (agent->eps_max > 0)
 2544         update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
 2545 
 2546     if (agent->var_on_error) {
 2547         struct sample smp;
 2548 
 2549         memset(&smp, 0, sizeof(smp));
 2550         smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
 2551         smp.data.u.sint = ctx->status_code;
 2552         smp.data.type   = SMP_T_BOOL;
 2553 
 2554         spoe_set_var(ctx, "txn", agent->var_on_error,
 2555                  strlen(agent->var_on_error), &smp);
 2556     }
 2557 
 2558     ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
 2559               ? SPOE_CTX_ST_READY
 2560               : SPOE_CTX_ST_NONE);
 2561 }
 2562 
 2563 static inline int
 2564 spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir)
 2565 {
 2566     /* If a process is already started for this SPOE context, retry
 2567      * later. */
 2568     if (ctx->flags & SPOE_CTX_FL_PROCESS)
 2569         return 0;
 2570 
 2571     agent->rt[tid].processing++;
 2572     ctx->stats.tv_start   = now;
 2573     ctx->stats.tv_request = now;
 2574     ctx->stats.t_request  = -1;
 2575     ctx->stats.t_queue    = -1;
 2576     ctx->stats.t_waiting  = -1;
 2577     ctx->stats.t_response = -1;
 2578     ctx->stats.t_process  = -1;
 2579 
 2580     ctx->status_code = 0;
 2581 
 2582     /* Set the right flag to prevent request and response processing
 2583      * in same time. */
 2584     ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
 2585                ? SPOE_CTX_FL_REQ_PROCESS
 2586                : SPOE_CTX_FL_RSP_PROCESS);
 2587     return 1;
 2588 }
 2589 
 2590 static inline void
 2591 spoe_stop_processing(struct spoe_agent *agent, struct spoe_context *ctx)
 2592 {
 2593     struct spoe_appctx *sa = ctx->spoe_appctx;
 2594 
 2595     if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
 2596         return;
 2597     _HA_ATOMIC_ADD(&agent->counters.nb_processed, 1);
 2598     if (sa) {
 2599         if (sa->frag_ctx.ctx == ctx) {
 2600             sa->frag_ctx.ctx = NULL;
 2601             spoe_wakeup_appctx(sa->owner);
 2602         }
 2603         else
 2604             sa->cur_fpa--;
 2605     }
 2606 
 2607     /* Reset the flag to allow next processing */
 2608     agent->rt[tid].processing--;
 2609     ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
 2610 
 2611     /* Reset processing timer */
 2612     ctx->process_exp = TICK_ETERNITY;
 2613 
 2614     spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 2615 
 2616     ctx->spoe_appctx          = NULL;
 2617     ctx->frag_ctx.curmsg      = NULL;
 2618     ctx->frag_ctx.curarg      = NULL;
 2619     ctx->frag_ctx.curoff      = 0;
 2620     ctx->frag_ctx.flags       = 0;
 2621 
 2622     if (!LIST_ISEMPTY(&ctx->list)) {
 2623         if (ctx->state == SPOE_CTX_ST_SENDING_MSGS)
 2624             _HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 2625         else
 2626             _HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 2627 
 2628         LIST_DEL(&ctx->list);
 2629         LIST_INIT(&ctx->list);
 2630     }
 2631 }
 2632 
 2633 /* Process a list of SPOE messages. First, this functions will process messages
 2634  *  and send them to an agent in a NOTIFY frame. Then, it will wait a ACK frame
 2635  *  to process corresponding actions. During all the processing, it returns 0
 2636  *  and it returns 1 when the processing is finished. If an error occurred, -1
 2637  *  is returned. */
 2638 static int
 2639 spoe_process_messages(struct stream *s, struct spoe_context *ctx,
 2640               struct list *messages, int dir, int type)
 2641 {
 2642     struct spoe_config *conf = FLT_CONF(ctx->filter);
 2643     struct spoe_agent  *agent = conf->agent;
 2644     int                 ret = 1;
 2645 
 2646     if (ctx->state == SPOE_CTX_ST_ERROR)
 2647         goto end;
 2648 
 2649     if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
 2650         SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2651                 " - failed to process messages: timeout\n",
 2652                 (int)now.tv_sec, (int)now.tv_usec,
 2653                 agent->id, __FUNCTION__, s);
 2654         ctx->status_code = SPOE_CTX_ERR_TOUT;
 2655         goto end;
 2656     }
 2657 
 2658     if (ctx->state == SPOE_CTX_ST_READY) {
 2659         if (agent->eps_max > 0) {
 2660             if (!freq_ctr_remain(&agent->rt[tid].err_per_sec, agent->eps_max, 0)) {
 2661                 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2662                         " - skip processing of messages: max EPS reached\n",
 2663                         (int)now.tv_sec, (int)now.tv_usec,
 2664                         agent->id, __FUNCTION__, s);
 2665                 goto skip;
 2666             }
 2667         }
 2668 
 2669         if (!tick_isset(ctx->process_exp)) {
 2670             ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
 2671             s->task->expire  = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
 2672                               ctx->process_exp);
 2673         }
 2674         ret = spoe_start_processing(agent, ctx, dir);
 2675         if (!ret)
 2676             goto out;
 2677 
 2678         ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
 2679         /* fall through */
 2680     }
 2681 
 2682     if (ctx->state == SPOE_CTX_ST_ENCODING_MSGS) {
 2683         if (tv_iszero(&ctx->stats.tv_request))
 2684             ctx->stats.tv_request = now;
 2685         if (!spoe_acquire_buffer(&ctx->buffer, &ctx->buffer_wait))
 2686             goto out;
 2687         ret = spoe_encode_messages(s, ctx, messages, dir, type);
 2688         if (ret < 0)
 2689             goto end;
 2690         if (!ret)
 2691             goto skip;
 2692         if (spoe_queue_context(ctx) < 0)
 2693             goto end;
 2694         ctx->state = SPOE_CTX_ST_SENDING_MSGS;
 2695     }
 2696 
 2697     if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
 2698         if (ctx->spoe_appctx)
 2699             spoe_wakeup_appctx(ctx->spoe_appctx->owner);
 2700         ret = 0;
 2701         goto out;
 2702     }
 2703 
 2704     if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
 2705         ret = 0;
 2706         goto out;
 2707     }
 2708 
 2709     if (ctx->state == SPOE_CTX_ST_DONE) {
 2710         spoe_process_actions(s, ctx, dir);
 2711         ret = 1;
 2712         ctx->frame_id++;
 2713         ctx->state = SPOE_CTX_ST_READY;
 2714         spoe_update_stat_time(&ctx->stats.tv_response, &ctx->stats.t_response);
 2715         goto end;
 2716     }
 2717 
 2718   out:
 2719     return ret;
 2720 
 2721   skip:
 2722     tv_zero(&ctx->stats.tv_start);
 2723     ctx->state = SPOE_CTX_ST_READY;
 2724     spoe_stop_processing(agent, ctx);
 2725     return 1;
 2726 
 2727   end:
 2728     spoe_update_stats(s, agent, ctx, dir);
 2729     spoe_stop_processing(agent, ctx);
 2730     if (ctx->status_code) {
 2731         _HA_ATOMIC_ADD(&agent->counters.nb_errors, 1);
 2732         spoe_handle_processing_error(s, agent, ctx, dir);
 2733         ret = 1;
 2734     }
 2735     return ret;
 2736 }
 2737 
 2738 /* Process a SPOE group, ie the list of messages attached to the group <grp>.
 2739  * See spoe_process_message for details. */
 2740 static int
 2741 spoe_process_group(struct stream *s, struct spoe_context *ctx,
 2742            struct spoe_group *group, int dir)
 2743 {
 2744     struct spoe_config *conf = FLT_CONF(ctx->filter);
 2745     struct spoe_agent  *agent = conf->agent;
 2746     int ret;
 2747 
 2748     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2749             " - ctx-state=%s - Process messages for group=%s\n",
 2750             (int)now.tv_sec, (int)now.tv_usec, agent->id,
 2751             __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
 2752             group->id);
 2753 
 2754     if (LIST_ISEMPTY(&group->messages))
 2755         return 1;
 2756 
 2757     ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP);
 2758     if (ret && ctx->stats.t_process != -1) {
 2759         SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2760                 " - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
 2761                 (int)now.tv_sec, (int)now.tv_usec, agent->id,
 2762                 __FUNCTION__, s, group->id, s->uniq_id, ctx->status_code,
 2763                 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
 2764                 ctx->stats.t_response, ctx->stats.t_process,
 2765                 agent->counters.idles, agent->counters.applets,
 2766                 agent->counters.nb_sending, agent->counters.nb_waiting,
 2767                 agent->counters.nb_errors, agent->counters.nb_processed,
 2768                 agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
 2769         if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
 2770             send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
 2771                  "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
 2772                  agent->id, group->id, s->uniq_id, ctx->status_code,
 2773                  ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
 2774                  ctx->stats.t_response, ctx->stats.t_process,
 2775                  agent->counters.idles, agent->counters.applets,
 2776                  agent->counters.nb_sending, agent->counters.nb_waiting,
 2777                  agent->counters.nb_errors, agent->counters.nb_processed);
 2778     }
 2779     return ret;
 2780 }
 2781 
 2782 /* Process a SPOE event, ie the list of messages attached to the event <ev>.
 2783  * See spoe_process_message for details. */
 2784 static int
 2785 spoe_process_event(struct stream *s, struct spoe_context *ctx,
 2786            enum spoe_event ev)
 2787 {
 2788     struct spoe_config *conf = FLT_CONF(ctx->filter);
 2789     struct spoe_agent  *agent = conf->agent;
 2790     int dir, ret;
 2791 
 2792     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2793             " - ctx-state=%s - Process messages for event=%s\n",
 2794             (int)now.tv_sec, (int)now.tv_usec, agent->id,
 2795             __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
 2796             spoe_event_str[ev]);
 2797 
 2798     dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
 2799 
 2800     if (LIST_ISEMPTY(&(ctx->events[ev])))
 2801         return 1;
 2802 
 2803     ret = spoe_process_messages(s, ctx, &(ctx->events[ev]), dir, SPOE_MSGS_BY_EVENT);
 2804     if (ret && ctx->stats.t_process != -1) {
 2805         SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 2806                 " - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
 2807                 (int)now.tv_sec, (int)now.tv_usec, agent->id,
 2808                 __FUNCTION__, s, spoe_event_str[ev], s->uniq_id, ctx->status_code,
 2809                 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
 2810                 ctx->stats.t_response, ctx->stats.t_process,
 2811                 agent->counters.idles, agent->counters.applets,
 2812                 agent->counters.nb_sending, agent->counters.nb_waiting,
 2813                 agent->counters.nb_errors, agent->counters.nb_processed,
 2814                 agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
 2815         if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
 2816             send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
 2817                  "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
 2818                  agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
 2819                  ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
 2820                  ctx->stats.t_response, ctx->stats.t_process,
 2821                  agent->counters.idles, agent->counters.applets,
 2822                  agent->counters.nb_sending, agent->counters.nb_waiting,
 2823                  agent->counters.nb_errors, agent->counters.nb_processed);
 2824     }
 2825     return ret;
 2826 }
 2827 
 2828 /***************************************************************************
 2829  * Functions that create/destroy SPOE contexts
 2830  **************************************************************************/
 2831 static int
 2832 spoe_acquire_buffer(struct buffer *buf, struct buffer_wait *buffer_wait)
 2833 {
 2834     if (buf->size)
 2835         return 1;
 2836 
 2837     if (!LIST_ISEMPTY(&buffer_wait->list)) {
 2838         HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
 2839         LIST_DEL(&buffer_wait->list);
 2840         LIST_INIT(&buffer_wait->list);
 2841         HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
 2842     }
 2843 
 2844     if (b_alloc_margin(buf, global.tune.reserved_bufs))
 2845         return 1;
 2846 
 2847     HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
 2848     LIST_ADDQ(&buffer_wq, &buffer_wait->list);
 2849     HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
 2850     return 0;
 2851 }
 2852 
 2853 static void
 2854 spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait)
 2855 {
 2856     if (!LIST_ISEMPTY(&buffer_wait->list)) {
 2857         HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
 2858         LIST_DEL(&buffer_wait->list);
 2859         LIST_INIT(&buffer_wait->list);
 2860         HA_SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
 2861     }
 2862 
 2863     /* Release the buffer if needed */
 2864     if (buf->size) {
 2865         b_free(buf);
 2866         offer_buffers(buffer_wait->target, tasks_run_queue);
 2867     }
 2868 }
 2869 
 2870 static int
 2871 spoe_wakeup_context(struct spoe_context *ctx)
 2872 {
 2873     task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 2874     return 1;
 2875 }
 2876 
 2877 static struct spoe_context *
 2878 spoe_create_context(struct stream *s, struct filter *filter)
 2879 {
 2880     struct spoe_config  *conf = FLT_CONF(filter);
 2881     struct spoe_context *ctx;
 2882 
 2883     ctx = pool_alloc_dirty(pool_head_spoe_ctx);
 2884     if (ctx == NULL) {
 2885         return NULL;
 2886     }
 2887     memset(ctx, 0, sizeof(*ctx));
 2888     ctx->filter      = filter;
 2889     ctx->state       = SPOE_CTX_ST_NONE;
 2890     ctx->status_code = SPOE_CTX_ERR_NONE;
 2891     ctx->flags       = 0;
 2892     ctx->events      = conf->agent->events;
 2893     ctx->groups      = &conf->agent->groups;
 2894     ctx->buffer      = BUF_NULL;
 2895     LIST_INIT(&ctx->buffer_wait.list);
 2896     ctx->buffer_wait.target = ctx;
 2897     ctx->buffer_wait.wakeup_cb = (int (*)(void *))spoe_wakeup_context;
 2898     LIST_INIT(&ctx->list);
 2899 
 2900     ctx->stream_id   = 0;
 2901     ctx->frame_id    = 1;
 2902     ctx->process_exp = TICK_ETERNITY;
 2903 
 2904     tv_zero(&ctx->stats.tv_start);
 2905     tv_zero(&ctx->stats.tv_request);
 2906     tv_zero(&ctx->stats.tv_queue);
 2907     tv_zero(&ctx->stats.tv_wait);
 2908     tv_zero(&ctx->stats.tv_response);
 2909     ctx->stats.t_request  = -1;
 2910     ctx->stats.t_queue    = -1;
 2911     ctx->stats.t_waiting  = -1;
 2912     ctx->stats.t_response = -1;
 2913     ctx->stats.t_process  = -1;
 2914     ctx->stats.t_total    =  0;
 2915 
 2916     ctx->strm   = s;
 2917     ctx->state  = SPOE_CTX_ST_READY;
 2918     filter->ctx = ctx;
 2919 
 2920     return ctx;
 2921 }
 2922 
 2923 static void
 2924 spoe_destroy_context(struct filter *filter)
 2925 {
 2926     struct spoe_config  *conf = FLT_CONF(filter);
 2927     struct spoe_context *ctx  = filter->ctx;
 2928 
 2929     if (!ctx)
 2930         return;
 2931 
 2932     spoe_stop_processing(conf->agent, ctx);
 2933     pool_free(pool_head_spoe_ctx, ctx);
 2934     filter->ctx = NULL;
 2935 }
 2936 
 2937 static void
 2938 spoe_reset_context(struct spoe_context *ctx)
 2939 {
 2940     ctx->state  = SPOE_CTX_ST_READY;
 2941     ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
 2942 
 2943     tv_zero(&ctx->stats.tv_start);
 2944     tv_zero(&ctx->stats.tv_request);
 2945     tv_zero(&ctx->stats.tv_queue);
 2946     tv_zero(&ctx->stats.tv_wait);
 2947     tv_zero(&ctx->stats.tv_response);
 2948     ctx->stats.t_request  = -1;
 2949     ctx->stats.t_queue    = -1;
 2950     ctx->stats.t_waiting  = -1;
 2951     ctx->stats.t_response = -1;
 2952     ctx->stats.t_process  = -1;
 2953     ctx->stats.t_total    =  0;
 2954 }
 2955 
 2956 
 2957 /***************************************************************************
 2958  * Hooks that manage the filter lifecycle (init/check/deinit)
 2959  **************************************************************************/
 2960 /* Signal handler: Do a soft stop, wakeup SPOE applet */
 2961 static void
 2962 spoe_sig_stop(struct sig_handler *sh)
 2963 {
 2964     struct proxy *p;
 2965 
 2966     p = proxies_list;
 2967     while (p) {
 2968         struct flt_conf *fconf;
 2969 
 2970         list_for_each_entry(fconf, &p->filter_configs, list) {
 2971             struct spoe_config *conf;
 2972             struct spoe_agent  *agent;
 2973             struct spoe_appctx *spoe_appctx;
 2974             int i;
 2975 
 2976             if (fconf->id != spoe_filter_id)
 2977                 continue;
 2978 
 2979             conf  = fconf->conf;
 2980             agent = conf->agent;
 2981 
 2982             for (i = 0; i < global.nbthread; ++i) {
 2983                 HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
 2984                 list_for_each_entry(spoe_appctx, &agent->rt[i].applets, list)
 2985                     spoe_wakeup_appctx(spoe_appctx->owner);
 2986                 HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
 2987             }
 2988         }
 2989         p = p->next;
 2990     }
 2991 }
 2992 
 2993 
 2994 /* Initialize the SPOE filter. Returns -1 on error, else 0. */
 2995 static int
 2996 spoe_init(struct proxy *px, struct flt_conf *fconf)
 2997 {
 2998     struct spoe_config *conf = fconf->conf;
 2999 
 3000     /* conf->agent_fe was already initialized during the config
 3001      * parsing. Finish initialization. */
 3002         conf->agent_fe.last_change = now.tv_sec;
 3003         conf->agent_fe.cap = PR_CAP_FE;
 3004         conf->agent_fe.mode = PR_MODE_TCP;
 3005         conf->agent_fe.maxconn = 0;
 3006         conf->agent_fe.options2 |= PR_O2_INDEPSTR;
 3007         conf->agent_fe.conn_retries = CONN_RETRIES;
 3008         conf->agent_fe.accept = frontend_accept;
 3009         conf->agent_fe.srv = NULL;
 3010         conf->agent_fe.timeout.client = TICK_ETERNITY;
 3011     conf->agent_fe.default_target = &spoe_applet.obj_type;
 3012     conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
 3013 
 3014     if (!sighandler_registered) {
 3015         signal_register_fct(0, spoe_sig_stop, 0);
 3016         sighandler_registered = 1;
 3017     }
 3018 
 3019     fconf->flags |= FLT_CFG_FL_HTX;
 3020     return 0;
 3021 }
 3022 
 3023 /* Free ressources allocated by the SPOE filter. */
 3024 static void
 3025 spoe_deinit(struct proxy *px, struct flt_conf *fconf)
 3026 {
 3027     struct spoe_config *conf = fconf->conf;
 3028 
 3029     if (conf) {
 3030         struct spoe_agent *agent = conf->agent;
 3031 
 3032         spoe_release_agent(agent);
 3033         free(conf->id);
 3034         free(conf);
 3035     }
 3036     fconf->conf = NULL;
 3037 }
 3038 
 3039 /* Check configuration of a SPOE filter for a specified proxy.
 3040  * Return 1 on error, else 0. */
 3041 static int
 3042 spoe_check(struct proxy *px, struct flt_conf *fconf)
 3043 {
 3044     struct flt_conf    *f;
 3045     struct spoe_config *conf = fconf->conf;
 3046     struct proxy       *target;
 3047     int i;
 3048 
 3049     /* Check all SPOE filters for proxy <px> to be sure all SPOE agent names
 3050      * are uniq */
 3051     list_for_each_entry(f, &px->filter_configs, list) {
 3052         struct spoe_config *c = f->conf;
 3053 
 3054         /* This is not an SPOE filter */
 3055         if (f->id != spoe_filter_id)
 3056             continue;
 3057         /* This is the current SPOE filter */
 3058         if (f == fconf)
 3059             continue;
 3060 
 3061         /* Check engine Id. It should be uniq */
 3062         if (!strcmp(conf->id, c->id)) {
 3063             ha_alert("Proxy %s : duplicated name for SPOE engine '%s'.\n",
 3064                  px->id, conf->id);
 3065             return 1;
 3066         }
 3067     }
 3068 
 3069     target = proxy_be_by_name(conf->agent->b.name);
 3070     if (target == NULL) {
 3071         ha_alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
 3072              " declared at %s:%d.\n",
 3073              px->id, conf->agent->b.name, conf->agent->id,
 3074              conf->agent->conf.file, conf->agent->conf.line);
 3075         return 1;
 3076     }
 3077     if (target->mode != PR_MODE_TCP) {
 3078         ha_alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
 3079              " at %s:%d does not support HTTP mode.\n",
 3080              px->id, target->id, conf->agent->id,
 3081              conf->agent->conf.file, conf->agent->conf.line);
 3082         return 1;
 3083     }
 3084 
 3085     if (px->bind_proc & ~target->bind_proc) {
 3086         ha_alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
 3087              " at %s:%d does not cover all of its processes.\n",
 3088              px->id, target->id, conf->agent->id,
 3089              conf->agent->conf.file, conf->agent->conf.line);
 3090         return 1;
 3091     }
 3092 
 3093     if ((conf->agent->rt = calloc(global.nbthread, sizeof(*conf->agent->rt))) == NULL) {
 3094         ha_alert("Proxy %s : out of memory initializing SPOE agent '%s' declared at %s:%d.\n",
 3095              px->id, conf->agent->id, conf->agent->conf.file, conf->agent->conf.line);
 3096         return 1;
 3097     }
 3098     for (i = 0; i < global.nbthread; ++i) {
 3099         conf->agent->rt[i].engine_id    = NULL;
 3100         conf->agent->rt[i].frame_size   = conf->agent->max_frame_size;
 3101         conf->agent->rt[i].processing   = 0;
 3102         LIST_INIT(&conf->agent->rt[i].applets);
 3103         LIST_INIT(&conf->agent->rt[i].sending_queue);
 3104         LIST_INIT(&conf->agent->rt[i].waiting_queue);
 3105         HA_SPIN_INIT(&conf->agent->rt[i].lock);
 3106     }
 3107 
 3108     free(conf->agent->b.name);
 3109     conf->agent->b.name = NULL;
 3110     conf->agent->b.be = target;
 3111     return 0;
 3112 }
 3113 
 3114 /* Initializes the SPOE filter for a proxy for a specific thread.
 3115  * Returns a negative value if an error occurs. */
 3116 static int
 3117 spoe_init_per_thread(struct proxy *p, struct flt_conf *fconf)
 3118 {
 3119     struct spoe_config *conf = fconf->conf;
 3120     struct spoe_agent *agent = conf->agent;
 3121 
 3122     /* Use a != seed per process */
 3123     if (relative_pid > 1 && tid == 0)
 3124         srandom(now_ms * pid);
 3125 
 3126     agent->rt[tid].engine_id = generate_pseudo_uuid();
 3127     if (agent->rt[tid].engine_id == NULL)
 3128         return -1;
 3129     return 0;
 3130 }
 3131 
 3132 /**************************************************************************
 3133  * Hooks attached to a stream
 3134  *************************************************************************/
 3135 /* Called when a filter instance is created and attach to a stream. It creates
 3136  * the context that will be used to process this stream. */
 3137 static int
 3138 spoe_start(struct stream *s, struct filter *filter)
 3139 {
 3140     struct spoe_config  *conf  = FLT_CONF(filter);
 3141     struct spoe_agent   *agent = conf->agent;
 3142     struct spoe_context *ctx;
 3143 
 3144     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
 3145             (int)now.tv_sec, (int)now.tv_usec, agent->id,
 3146             __FUNCTION__, s);
 3147 
 3148     if ((ctx = spoe_create_context(s, filter)) == NULL) {
 3149         SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 3150                 " - failed to create SPOE context\n",
 3151                 (int)now.tv_sec, (int)now.tv_usec, agent->id,
 3152                 __FUNCTION__, s);
 3153         send_log(&conf->agent_fe, LOG_EMERG,
 3154              "SPOE: [%s] failed to create SPOE context\n",
 3155              agent->id);
 3156         return 0;
 3157     }
 3158 
 3159     if (!LIST_ISEMPTY(&ctx->events[SPOE_EV_ON_TCP_REQ_FE]))
 3160         filter->pre_analyzers |= AN_REQ_INSPECT_FE;
 3161 
 3162     if (!LIST_ISEMPTY(&ctx->events[SPOE_EV_ON_TCP_REQ_BE]))
 3163         filter->pre_analyzers |= AN_REQ_INSPECT_BE;
 3164 
 3165     if (!LIST_ISEMPTY(&ctx->events[SPOE_EV_ON_TCP_RSP]))
 3166         filter->pre_analyzers |= AN_RES_INSPECT;
 3167 
 3168     if (!LIST_ISEMPTY(&ctx->events[SPOE_EV_ON_HTTP_REQ_FE]))
 3169         filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
 3170 
 3171     if (!LIST_ISEMPTY(&ctx->events[SPOE_EV_ON_HTTP_REQ_BE]))
 3172         filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
 3173 
 3174     if (!LIST_ISEMPTY(&ctx->events[SPOE_EV_ON_HTTP_RSP]))
 3175         filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
 3176 
 3177     return 1;
 3178 }
 3179 
 3180 /* Called when a filter instance is detached from a stream. It release the
 3181  * attached SPOE context. */
 3182 static void
 3183 spoe_stop(struct stream *s, struct filter *filter)
 3184 {
 3185     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
 3186             (int)now.tv_sec, (int)now.tv_usec,
 3187             ((struct spoe_config *)FLT_CONF(filter))->agent->id,
 3188             __FUNCTION__, s);
 3189     spoe_destroy_context(filter);
 3190 }
 3191 
 3192 
 3193 /*
 3194  * Called when the stream is woken up because of expired timer.
 3195  */
 3196 static void
 3197 spoe_check_timeouts(struct stream *s, struct filter *filter)
 3198 {
 3199     struct spoe_context *ctx = filter->ctx;
 3200 
 3201     if (tick_is_expired(ctx->process_exp, now_ms))
 3202         s->pending_events |= TASK_WOKEN_MSG;
 3203 }
 3204 
 3205 /* Called when we are ready to filter data on a channel */
 3206 static int
 3207 spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
 3208 {
 3209     struct spoe_context *ctx = filter->ctx;
 3210     int                  ret = 1;
 3211 
 3212     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
 3213             " - ctx-flags=0x%08x\n",
 3214             (int)now.tv_sec, (int)now.tv_usec,
 3215             ((struct spoe_config *)FLT_CONF(filter))->agent->id,
 3216             __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
 3217 
 3218     if (ctx->state == SPOE_CTX_ST_NONE)
 3219         goto out;
 3220 
 3221     if (!(chn->flags & CF_ISRESP)) {
 3222         if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
 3223             chn->analysers |= AN_REQ_INSPECT_FE;
 3224         if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
 3225             chn->analysers |= AN_REQ_INSPECT_BE;
 3226 
 3227         if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
 3228             goto out;
 3229 
 3230         ctx->stream_id = s->uniq_id;
 3231         ret = spoe_process_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
 3232         if (!ret)
 3233             goto out;
 3234         ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
 3235     }
 3236     else {
 3237         if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
 3238             chn->analysers |= AN_RES_INSPECT;
 3239 
 3240         if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
 3241             goto out;
 3242 
 3243         ret = spoe_process_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
 3244         if (!ret) {
 3245             channel_dont_read(chn);
 3246             channel_dont_close(chn);
 3247             goto out;
 3248         }
 3249         ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
 3250     }
 3251 
 3252   out:
 3253     return ret;
 3254 }
 3255 
 3256 /* Called before a processing happens on a given channel */
 3257 static int
 3258 spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
 3259              struct channel *chn, unsigned an_bit)
 3260 {
 3261     struct spoe_context *ctx = filter->ctx;
 3262     int                  ret = 1;
 3263 
 3264     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
 3265             " - ctx-flags=0x%08x - ana=0x%08x\n",
 3266             (int)now.tv_sec, (int)now.tv_usec,
 3267             ((struct spoe_config *)FLT_CONF(filter))->agent->id,
 3268             __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
 3269             ctx->flags, an_bit);
 3270 
 3271     if (ctx->state == SPOE_CTX_ST_NONE)
 3272         goto out;
 3273 
 3274     switch (an_bit) {
 3275         case AN_REQ_INSPECT_FE:
 3276             ret = spoe_process_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
 3277             break;
 3278         case AN_REQ_INSPECT_BE:
 3279             ret = spoe_process_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
 3280             break;
 3281         case AN_RES_INSPECT:
 3282             ret = spoe_process_event(s, ctx, SPOE_EV_ON_TCP_RSP);
 3283             break;
 3284         case AN_REQ_HTTP_PROCESS_FE:
 3285             ret = spoe_process_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
 3286             break;
 3287         case AN_REQ_HTTP_PROCESS_BE:
 3288             ret = spoe_process_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
 3289             break;
 3290         case AN_RES_HTTP_PROCESS_FE:
 3291             ret = spoe_process_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
 3292             break;
 3293     }
 3294 
 3295   out:
 3296     if (!ret && (chn->flags & CF_ISRESP)) {
 3297                 channel_dont_read(chn);
 3298                 channel_dont_close(chn);
 3299     }
 3300     return ret;
 3301 }
 3302 
 3303 /* Called when the filtering on the channel ends. */
 3304 static int
 3305 spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
 3306 {
 3307     struct spoe_context *ctx = filter->ctx;
 3308 
 3309     SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
 3310             " - ctx-flags=0x%08x\n",
 3311             (int)now.tv_sec, (int)now.tv_usec,
 3312             ((struct spoe_config *)FLT_CONF(filter))->agent->id,
 3313             __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
 3314 
 3315     if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
 3316         spoe_reset_context(ctx);
 3317     }
 3318 
 3319     return 1;
 3320 }
 3321 
 3322 /********************************************************************
 3323  * Functions that manage the filter initialization
 3324  ********************************************************************/
 3325 struct flt_ops spoe_ops = {
 3326     /* Manage SPOE filter, called for each filter declaration */
 3327     .init   = spoe_init,
 3328     .deinit = spoe_deinit,
 3329     .check  = spoe_check,
 3330     .init_per_thread = spoe_init_per_thread,
 3331 
 3332     /* Handle start/stop of SPOE */
 3333     .attach         = spoe_start,
 3334     .detach         = spoe_stop,
 3335     .check_timeouts = spoe_check_timeouts,
 3336 
 3337     /* Handle channels activity */
 3338     .channel_start_analyze = spoe_start_analyze,
 3339     .channel_pre_analyze   = spoe_chn_pre_analyze,
 3340     .channel_end_analyze   = spoe_end_analyze,
 3341 };
 3342 
 3343 
 3344 static int
 3345 cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
 3346 {
 3347     const char *err;
 3348     int         i, err_code = 0;
 3349 
 3350     if ((cfg_scope == NULL && curengine != NULL) ||
 3351         (cfg_scope != NULL && curengine == NULL) ||
 3352         (curengine != NULL && cfg_scope != NULL && strcmp(curengine, cfg_scope)))
 3353         goto out;
 3354 
 3355     if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
 3356         if (!*args[1]) {
 3357             ha_alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
 3358                  file, linenum);
 3359             err_code |= ERR_ALERT | ERR_ABORT;
 3360             goto out;
 3361         }
 3362         if (alertif_too_many_args(1, file, linenum, args, &err_code)) {
 3363             err_code |= ERR_ABORT;
 3364             goto out;
 3365         }
 3366 
 3367         err = invalid_char(args[1]);
 3368         if (err) {
 3369             ha_alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
 3370                  file, linenum, *err, args[0], args[1]);
 3371             err_code |= ERR_ALERT | ERR_ABORT;
 3372             goto out;
 3373         }
 3374 
 3375         if (curagent != NULL) {
 3376             ha_alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
 3377                  file, linenum);
 3378             err_code |= ERR_ALERT | ERR_ABORT;
 3379             goto out;
 3380         }
 3381         if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
 3382             ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
 3383             err_code |= ERR_ALERT | ERR_ABORT;
 3384             goto out;
 3385         }
 3386 
 3387         curagent->id              = strdup(args[1]);
 3388 
 3389         curagent->conf.file       = strdup(file);
 3390         curagent->conf.line       = linenum;
 3391 
 3392         curagent->timeout.hello      = TICK_ETERNITY;
 3393         curagent->timeout.idle       = TICK_ETERNITY;
 3394         curagent->timeout.processing = TICK_ETERNITY;
 3395 
 3396         curagent->var_pfx        = NULL;
 3397         curagent->var_on_error   = NULL;
 3398         curagent->var_t_process  = NULL;
 3399         curagent->var_t_total    = NULL;
 3400         curagent->flags          = (SPOE_FL_ASYNC | SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION);
 3401         curagent->cps_max        = 0;
 3402         curagent->eps_max        = 0;
 3403         curagent->max_frame_size = MAX_FRAME_SIZE;
 3404         curagent->max_fpa        = 20;
 3405 
 3406         for (i = 0; i < SPOE_EV_EVENTS; ++i)
 3407             LIST_INIT(&curagent->events[i]);
 3408         LIST_INIT(&curagent->groups);
 3409         LIST_INIT(&curagent->messages);
 3410     }
 3411     else if (!strcmp(args[0], "use-backend")) {
 3412         if (!*args[1]) {
 3413             ha_alert("parsing [%s:%d] : '%s' expects a backend name.\n",
 3414                  file, linenum, args[0]);
 3415             err_code |= ERR_ALERT | ERR_FATAL;
 3416             goto out;
 3417         }
 3418         if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3419             goto out;
 3420         free(curagent->b.name);
 3421         curagent->b.name = strdup(args[1]);
 3422     }
 3423     else if (!strcmp(args[0], "messages")) {
 3424         int cur_arg = 1;
 3425         while (*args[cur_arg]) {
 3426             struct spoe_placeholder *ph = NULL;
 3427 
 3428             list_for_each_entry(ph, &curmphs, list) {
 3429                 if (!strcmp(ph->id, args[cur_arg])) {
 3430                     ha_alert("parsing [%s:%d]: spoe-message '%s' already used.\n",
 3431                          file, linenum, args[cur_arg]);
 3432                     err_code |= ERR_ALERT | ERR_FATAL;
 3433                     goto out;
 3434                 }
 3435             }
 3436 
 3437             if ((ph = calloc(1, sizeof(*ph))) == NULL) {
 3438                 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
 3439                 err_code |= ERR_ALERT | ERR_ABORT;
 3440                 goto out;
 3441             }
 3442             ph->id = strdup(args[cur_arg]);
 3443             LIST_ADDQ(&curmphs, &ph->list);
 3444             cur_arg++;
 3445         }
 3446     }
 3447     else if (!strcmp(args[0], "groups")) {
 3448         int cur_arg = 1;
 3449         while (*args[cur_arg]) {
 3450             struct spoe_placeholder *ph = NULL;
 3451 
 3452             list_for_each_entry(ph, &curgphs, list) {
 3453                 if (!strcmp(ph->id, args[cur_arg])) {
 3454                     ha_alert("parsing [%s:%d]: spoe-group '%s' already used.\n",
 3455                          file, linenum, args[cur_arg]);
 3456                     err_code |= ERR_ALERT | ERR_FATAL;
 3457                     goto out;
 3458                 }
 3459             }
 3460 
 3461             if ((ph = calloc(1, sizeof(*ph))) == NULL) {
 3462                 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
 3463                 err_code |= ERR_ALERT | ERR_ABORT;
 3464                 goto out;
 3465             }
 3466             ph->id = strdup(args[cur_arg]);
 3467             LIST_ADDQ(&curgphs, &ph->list);
 3468             cur_arg++;
 3469         }
 3470     }
 3471     else if (!strcmp(args[0], "timeout")) {
 3472         unsigned int *tv = NULL;
 3473         const char   *res;
 3474         unsigned      timeout;
 3475 
 3476         if (!*args[1]) {
 3477             ha_alert("parsing [%s:%d] : 'timeout' expects 'hello', 'idle' and 'processing'.\n",
 3478                  file, linenum);
 3479             err_code |= ERR_ALERT | ERR_FATAL;
 3480             goto out;
 3481         }
 3482         if (alertif_too_many_args(2, file, linenum, args, &err_code))
 3483             goto out;
 3484         if (!strcmp(args[1], "hello"))
 3485             tv = &curagent->timeout.hello;
 3486         else if (!strcmp(args[1], "idle"))
 3487             tv = &curagent->timeout.idle;
 3488         else if (!strcmp(args[1], "processing"))
 3489             tv = &curagent->timeout.processing;
 3490         else {
 3491             ha_alert("parsing [%s:%d] : 'timeout' supports 'hello', 'idle' or 'processing' (got %s).\n",
 3492                  file, linenum, args[1]);
 3493             err_code |= ERR_ALERT | ERR_FATAL;
 3494             goto out;
 3495         }
 3496         if (!*args[2]) {
 3497             ha_alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
 3498                  file, linenum, args[1]);
 3499             err_code |= ERR_ALERT | ERR_FATAL;
 3500             goto out;
 3501         }
 3502         res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
 3503         if (res == PARSE_TIME_OVER) {
 3504             ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
 3505                  file, linenum, args[2], args[0], args[1]);
 3506             err_code |= ERR_ALERT | ERR_FATAL;
 3507             goto out;
 3508         }
 3509         else if (res == PARSE_TIME_UNDER) {
 3510             ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
 3511                  file, linenum, args[2], args[0], args[1]);
 3512             err_code |= ERR_ALERT | ERR_FATAL;
 3513             goto out;
 3514         }
 3515         else if (res) {
 3516             ha_alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
 3517                  file, linenum, *res, args[1]);
 3518             err_code |= ERR_ALERT | ERR_FATAL;
 3519             goto out;
 3520         }
 3521         *tv = MS_TO_TICKS(timeout);
 3522     }
 3523     else if (!strcmp(args[0], "option")) {
 3524         if (!*args[1]) {
 3525                         ha_alert("parsing [%s:%d]: '%s' expects an option name.\n",
 3526                  file, linenum, args[0]);
 3527                         err_code |= ERR_ALERT | ERR_FATAL;
 3528                         goto out;
 3529                 }
 3530 
 3531         if (!strcmp(args[1], "pipelining")) {
 3532             if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3533                 goto out;
 3534             if (kwm == 1)
 3535                 curagent->flags &= ~SPOE_FL_PIPELINING;
 3536             else
 3537                 curagent->flags |= SPOE_FL_PIPELINING;
 3538             goto out;
 3539         }
 3540         else if (!strcmp(args[1], "async")) {
 3541             if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3542                 goto out;
 3543             if (kwm == 1)
 3544                 curagent->flags &= ~SPOE_FL_ASYNC;
 3545             else
 3546                 curagent->flags |= SPOE_FL_ASYNC;
 3547             goto out;
 3548         }
 3549         else if (!strcmp(args[1], "send-frag-payload")) {
 3550             if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3551                 goto out;
 3552             if (kwm == 1)
 3553                 curagent->flags &= ~SPOE_FL_SND_FRAGMENTATION;
 3554             else
 3555                 curagent->flags |= SPOE_FL_SND_FRAGMENTATION;
 3556             goto out;
 3557         }
 3558         else if (!strcmp(args[1], "dontlog-normal")) {
 3559             if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3560                 goto out;
 3561             if (kwm == 1)
 3562                 curpxopts2 &= ~PR_O2_NOLOGNORM;
 3563             else
 3564                 curpxopts2 |= PR_O2_NOLOGNORM;
 3565             goto out;
 3566         }
 3567 
 3568         /* Following options does not support negation */
 3569         if (kwm == 1) {
 3570             ha_alert("parsing [%s:%d]: negation is not supported for option '%s'.\n",
 3571                  file, linenum, args[1]);
 3572             err_code |= ERR_ALERT | ERR_FATAL;
 3573             goto out;
 3574         }
 3575 
 3576         if (!strcmp(args[1], "var-prefix")) {
 3577             char *tmp;
 3578 
 3579             if (!*args[2]) {
 3580                 ha_alert("parsing [%s:%d]: '%s %s' expects a value.\n",
 3581                      file, linenum, args[0],
 3582                      args[1]);
 3583                 err_code |= ERR_ALERT | ERR_FATAL;
 3584                 goto out;
 3585             }
 3586             if (alertif_too_many_args(2, file, linenum, args, &err_code))
 3587                 goto out;
 3588             tmp = args[2];
 3589             while (*tmp) {
 3590                 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
 3591                     ha_alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z0-9_.] chars.\n",
 3592                          file, linenum, args[0], args[1]);
 3593                     err_code |= ERR_ALERT | ERR_FATAL;
 3594                     goto out;
 3595                 }
 3596                 tmp++;
 3597             }
 3598             curagent->var_pfx = strdup(args[2]);
 3599         }
 3600         else if (!strcmp(args[1], "force-set-var")) {
 3601             if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3602                 goto out;
 3603             curagent->flags |= SPOE_FL_FORCE_SET_VAR;
 3604         }
 3605         else if (!strcmp(args[1], "continue-on-error")) {
 3606             if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3607                 goto out;
 3608             curagent->flags |= SPOE_FL_CONT_ON_ERR;
 3609         }
 3610         else if (!strcmp(args[1], "set-on-error")) {
 3611             char *tmp;
 3612 
 3613             if (!*args[2]) {
 3614                 ha_alert("parsing [%s:%d]: '%s %s' expects a value.\n",
 3615                      file, linenum, args[0],
 3616                      args[1]);
 3617                 err_code |= ERR_ALERT | ERR_FATAL;
 3618                 goto out;
 3619             }
 3620             if (alertif_too_many_args(2, file, linenum, args, &err_code))
 3621                 goto out;
 3622             tmp = args[2];
 3623             while (*tmp) {
 3624                 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
 3625                     ha_alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z0-9_.] chars.\n",
 3626                          file, linenum, args[0], args[1]);
 3627                     err_code |= ERR_ALERT | ERR_FATAL;
 3628                     goto out;
 3629                 }
 3630                 tmp++;
 3631             }
 3632             curagent->var_on_error = strdup(args[2]);
 3633         }
 3634         else if (!strcmp(args[1], "set-process-time")) {
 3635             char *tmp;
 3636 
 3637             if (!*args[2]) {
 3638                 ha_alert("parsing [%s:%d]: '%s %s' expects a value.\n",
 3639                      file, linenum, args[0],
 3640                      args[1]);
 3641                 err_code |= ERR_ALERT | ERR_FATAL;
 3642                 goto out;
 3643             }
 3644             if (alertif_too_many_args(2, file, linenum, args, &err_code))
 3645                 goto out;
 3646             tmp = args[2];
 3647             while (*tmp) {
 3648                 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
 3649                     ha_alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z0-9_.] chars.\n",
 3650                          file, linenum, args[0], args[1]);
 3651                     err_code |= ERR_ALERT | ERR_FATAL;
 3652                     goto out;
 3653                 }
 3654                 tmp++;
 3655             }
 3656             curagent->var_t_process = strdup(args[2]);
 3657         }
 3658         else if (!strcmp(args[1], "set-total-time")) {
 3659             char *tmp;
 3660 
 3661             if (!*args[2]) {
 3662                 ha_alert("parsing [%s:%d]: '%s %s' expects a value.\n",
 3663                      file, linenum, args[0],
 3664                      args[1]);
 3665                 err_code |= ERR_ALERT | ERR_FATAL;
 3666                 goto out;
 3667             }
 3668             if (alertif_too_many_args(2, file, linenum, args, &err_code))
 3669                 goto out;
 3670             tmp = args[2];
 3671             while (*tmp) {
 3672                 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
 3673                     ha_alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z0-9_.] chars.\n",
 3674                          file, linenum, args[0], args[1]);
 3675                     err_code |= ERR_ALERT | ERR_FATAL;
 3676                     goto out;
 3677                 }
 3678                 tmp++;
 3679             }
 3680             curagent->var_t_total = strdup(args[2]);
 3681         }
 3682         else {
 3683             ha_alert("parsing [%s:%d]: option '%s' is not supported.\n",
 3684                  file, linenum, args[1]);
 3685             err_code |= ERR_ALERT | ERR_FATAL;
 3686             goto out;
 3687         }
 3688     }
 3689     else if (!strcmp(args[0], "maxconnrate")) {
 3690         if (!*args[1]) {
 3691             ha_alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
 3692                  file, linenum, args[0]);
 3693                         err_code |= ERR_ALERT | ERR_FATAL;
 3694                         goto out;
 3695                 }
 3696         if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3697             goto out;
 3698         curagent->cps_max = atol(args[1]);
 3699     }
 3700     else if (!strcmp(args[0], "maxerrrate")) {
 3701         if (!*args[1]) {
 3702             ha_alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
 3703                  file, linenum, args[0]);
 3704                         err_code |= ERR_ALERT | ERR_FATAL;
 3705                         goto out;
 3706                 }
 3707         if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3708             goto out;
 3709         curagent->eps_max = atol(args[1]);
 3710     }
 3711     else if (!strcmp(args[0], "max-frame-size")) {
 3712         if (!*args[1]) {
 3713             ha_alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
 3714                  file, linenum, args[0]);
 3715                         err_code |= ERR_ALERT | ERR_FATAL;
 3716                         goto out;
 3717                 }
 3718         if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3719             goto out;
 3720         curagent->max_frame_size = atol(args[1]);
 3721         if (curagent->max_frame_size < MIN_FRAME_SIZE ||
 3722             curagent->max_frame_size > MAX_FRAME_SIZE) {
 3723             ha_alert("parsing [%s:%d] : '%s' expects a positive integer argument in the range [%d, %d].\n",
 3724                  file, linenum, args[0], MIN_FRAME_SIZE, MAX_FRAME_SIZE);
 3725             err_code |= ERR_ALERT | ERR_FATAL;
 3726             goto out;
 3727         }
 3728     }
 3729     else if (!strcmp(args[0], "max-waiting-frames")) {
 3730         if (!*args[1]) {
 3731             ha_alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
 3732                  file, linenum, args[0]);
 3733                         err_code |= ERR_ALERT | ERR_FATAL;
 3734                         goto out;
 3735                 }
 3736         if (alertif_too_many_args(1, file, linenum, args, &err_code))
 3737             goto out;
 3738         curagent->max_fpa = atol(args[1]);
 3739         if (curagent->max_fpa < 1) {
 3740             ha_alert("parsing [%s:%d] : '%s' expects a positive integer argument.\n",
 3741                  file, linenum, args[0]);
 3742             err_code |= ERR_ALERT | ERR_FATAL;
 3743             goto out;
 3744         }
 3745     }
 3746     else if (!strcmp(args[0], "register-var-names")) {
 3747         int   cur_arg;
 3748 
 3749         if (!*args[1]) {
 3750             ha_alert("parsing [%s:%d] : '%s' expects one or more variable names.\n",
 3751                  file, linenum, args[0]);
 3752                         err_code |= ERR_ALERT | ERR_FATAL;
 3753                         goto out;
 3754                 }
 3755         cur_arg = 1;
 3756         while (*args[cur_arg]) {
 3757             struct spoe_var_placeholder *vph;
 3758 
 3759             if ((vph = calloc(1, sizeof(*vph))) == NULL) {
 3760                 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
 3761                 err_code |= ERR_ALERT | ERR_ABORT;
 3762                 goto out;
 3763             }
 3764             if ((vph->name  = strdup(args[cur_arg])) == NULL) {
 3765                 free(vph);
 3766                 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
 3767                 err_code |= ERR_ALERT | ERR_ABORT;
 3768                 goto out;
 3769             }
 3770             LIST_ADDQ(&curvars, &vph->list);
 3771             cur_arg++;
 3772         }
 3773     }
 3774     else if (!strcmp(args[0], "log")) {
 3775         char *errmsg = NULL;
 3776 
 3777         if (!parse_logsrv(args, &curlogsrvs, (kwm == 1), &errmsg)) {
 3778             ha_alert("parsing [%s:%d] : %s : %s\n", file, linenum, args[0], errmsg);
 3779             err_code |= ERR_ALERT | ERR_FATAL;
 3780             goto out;
 3781         }
 3782     }
 3783     else if (*args[0]) {
 3784         ha_alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
 3785              file, linenum, args[0]);
 3786         err_code |= ERR_ALERT | ERR_FATAL;
 3787         goto out;
 3788     }
 3789  out:
 3790     return err_code;
 3791 }
 3792 static int
 3793 cfg_parse_spoe_group(const char *file, int linenum, char **args, int kwm)
 3794 {
 3795     struct spoe_group *grp;
 3796     const char        *err;
 3797     int                err_code = 0;
 3798 
 3799     if ((cfg_scope == NULL && curengine != NULL) ||
 3800         (cfg_scope != NULL && curengine == NULL) ||
 3801         (curengine != NULL && cfg_scope != NULL && strcmp(curengine, cfg_scope)))
 3802         goto out;
 3803 
 3804     if (!strcmp(args[0], "spoe-group")) { /* new spoe-group section */
 3805         if (!*args[1]) {
 3806             ha_alert("parsing [%s:%d] : missing name for spoe-group section.\n",
 3807                  file, linenum);
 3808             err_code |= ERR_ALERT | ERR_ABORT;
 3809             goto out;
 3810         }
 3811         if (alertif_too_many_args(1, file, linenum, args, &err_code)) {
 3812             err_code |= ERR_ABORT;
 3813             goto out;
 3814         }
 3815 
 3816         err = invalid_char(args[1]);
 3817         if (err) {
 3818             ha_alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
 3819                  file, linenum, *err, args[0], args[1]);
 3820             err_code |= ERR_ALERT | ERR_ABORT;
 3821             goto out;
 3822         }
 3823 
 3824         list_for_each_entry(grp, &curgrps, list) {
 3825             if (!strcmp(grp->id, args[1])) {
 3826                 ha_alert("parsing [%s:%d]: spoe-group section '%s' has the same"
 3827                      " name as another one declared at %s:%d.\n",
 3828                      file, linenum, args[1], grp->conf.file, grp->conf.line);
 3829                 err_code |= ERR_ALERT | ERR_FATAL;
 3830                 goto out;
 3831             }
 3832         }
 3833 
 3834         if ((curgrp = calloc(1, sizeof(*curgrp))) == NULL) {
 3835             ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
 3836             err_code |= ERR_ALERT | ERR_ABORT;
 3837             goto out;
 3838         }
 3839 
 3840         curgrp->id        = strdup(args[1]);
 3841         curgrp->conf.file = strdup(file);
 3842         curgrp->conf.line = linenum;
 3843         LIST_INIT(&curgrp->phs);
 3844         LIST_INIT(&curgrp->messages);
 3845         LIST_ADDQ(&curgrps, &curgrp->list);
 3846     }
 3847     else if (!strcmp(args[0], "messages")) {
 3848         int cur_arg = 1;
 3849         while (*args[cur_arg]) {
 3850             struct spoe_placeholder *ph = NULL;
 3851 
 3852             list_for_each_entry(ph, &curgrp->phs, list) {
 3853                 if (!strcmp(ph->id, args[cur_arg])) {
 3854                     ha_alert("parsing [%s:%d]: spoe-message '%s' already used.\n",
 3855                          file, linenum, args[cur_arg]);
 3856                     err_code |= ERR_ALERT | ERR_FATAL;
 3857                     goto out;
 3858                 }
 3859             }
 3860 
 3861             if ((ph = calloc(1, sizeof(*ph))) == NULL) {
 3862                 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
 3863                 err_code |= ERR_ALERT | ERR_ABORT;
 3864                 goto out;
 3865             }
 3866             ph->id = strdup(args[cur_arg]);
 3867             LIST_ADDQ(&curgrp->phs, &ph->list);
 3868             cur_arg++;
 3869         }
 3870     }
 3871     else if (*args[0]) {
 3872         ha_alert("parsing [%s:%d] : unknown keyword '%s' in spoe-group section.\n",
 3873              file, linenum, args[0]);
 3874         err_code |= ERR_ALERT | ERR_FATAL;
 3875         goto out;
 3876     }
 3877  out:
 3878     return err_code;
 3879 }
 3880 
 3881 static int
 3882 cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
 3883 {
 3884     struct spoe_message *msg;
 3885     struct spoe_arg     *arg;
 3886     const char          *err;
 3887     char                *errmsg   = NULL;
 3888     int                  err_code = 0;
 3889 
 3890     if ((cfg_scope == NULL && curengine != NULL) ||
 3891         (cfg_scope != NULL && curengine == NULL) ||
 3892         (curengine != NULL && cfg_scope != NULL && strcmp(curengine, cfg_scope)))
 3893         goto out;
 3894 
 3895     if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
 3896         if (!*args[1]) {
 3897             ha_alert("parsing [%s:%d] : missing name for spoe-message section.\n",
 3898                  file, linenum);
 3899             err_code |= ERR_ALERT | ERR_ABORT;
 3900             goto out;
 3901         }
 3902         if (alertif_too_many_args(1, file, linenum, args, &err_code)) {
 3903             err_code |= ERR_ABORT;
 3904             goto out;
 3905         }
 3906 
 3907         err = invalid_char(args[1]);
 3908         if (err) {
 3909             ha_alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
 3910                  file, linenum, *err, args[0], args[1]);
 3911             err_code |= ERR_ALERT | ERR_ABORT;
 3912             goto out;
 3913         }
 3914 
 3915         list_for_each_entry(msg, &curmsgs, list) {
 3916             if (!strcmp(msg->id, args[1])) {
 3917                 ha_alert("parsing [%s:%d]: spoe-message section '%s' has the same"
 3918                      " name as another one declared at %s:%d.\n",
 3919                      file, linenum, args[1], msg->conf.file, msg->conf.line);
 3920                 err_code |= ERR_ALERT | ERR_FATAL;
 3921                 goto out;
 3922             }
 3923         }
 3924 
 3925         if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
 3926             ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
 3927             err_code |= ERR_ALERT | ERR_ABORT;
 3928             goto out;
 3929         }
 3930 
 3931         curmsg->id = strdup(args[1]);
 3932         curmsg->id_len = strlen(curmsg->id);
 3933         curmsg->event  = SPOE_EV_NONE;
 3934         curmsg->conf.file = strdup(file);
 3935         curmsg->conf.line = linenum;
 3936         curmsg->nargs = 0;
 3937         LIST_INIT(&curmsg->args);
 3938         LIST_INIT(&curmsg->acls);
 3939         LIST_INIT(&curmsg->by_evt);
 3940         LIST_INIT(&curmsg->by_grp);
 3941         LIST_ADDQ(&curmsgs, &curmsg->list);
 3942     }
 3943     else if (!strcmp(args[0], "args")) {
 3944         int cur_arg = 1;
 3945 
 3946         curproxy->conf.args.ctx  = ARGC_SPOE;
 3947         curproxy->conf.args.file = file;
 3948         curproxy->conf.args.line = linenum;
 3949         while (*args[cur_arg]) {
 3950             char *delim = strchr(args[cur_arg], '=');
 3951             int   idx = 0;
 3952 
 3953             if ((arg = calloc(1, sizeof(*arg))) == NULL) {
 3954                 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
 3955                 err_code |= ERR_ALERT | ERR_ABORT;
 3956                 goto out;
 3957             }
 3958 
 3959             if (!delim) {
 3960                 arg->name = NULL;
 3961                 arg->name_len  = 0;
 3962                 delim = args[cur_arg];
 3963             }
 3964             else {
 3965                 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
 3966                 arg->name_len = delim - args[cur_arg];
 3967                 delim++;
 3968             }
 3969             arg->expr = sample_parse_expr((char*[]){delim, NULL},
 3970                               &idx, file, linenum, &errmsg,
 3971                               &curproxy->conf.args);
 3972             if (arg->expr == NULL) {
 3973                 ha_alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
 3974                 err_code |= ERR_ALERT | ERR_FATAL;
 3975                 free(arg->name);
 3976                 free(arg);
 3977                 goto out;
 3978             }
 3979             curmsg->nargs++;
 3980             LIST_ADDQ(&curmsg->args, &arg->list);
 3981             cur_arg++;
 3982         }
 3983         curproxy->conf.args.file = NULL;
 3984         curproxy->conf.args.line = 0;
 3985     }
 3986     else if (!strcmp(args[0], "acl")) {
 3987         err = invalid_char(args[1]);
 3988         if (err) {
 3989             ha_alert("parsing [%s:%d] : character '%c' is not permitted in acl name '%s'.\n",
 3990                  file, linenum, *err, args[1]);
 3991             err_code |= ERR_ALERT | ERR_FATAL;
 3992             goto out;
 3993         }
 3994         if (parse_acl((const char **)args + 1, &curmsg->acls, &errmsg, &curproxy->conf.args, file, linenum) == NULL) {
 3995             ha_alert("parsing [%s:%d] : error detected while parsing ACL '%s' : %s.\n",
 3996                  file, linenum, args[1], errmsg);
 3997             err_code |= ERR_ALERT | ERR_FATAL;
 3998             goto out;
 3999         }
 4000     }
 4001     else if (!strcmp(args[0], "event")) {
 4002         if (!*args[1]) {
 4003             ha_alert("parsing [%s:%d] : missing event name.\n", file, linenum);
 4004             err_code |= ERR_ALERT | ERR_FATAL;
 4005             goto out;
 4006         }
 4007         /* if (alertif_too_many_args(1, file, linenum, args, &err_code)) */
 4008         /*  goto out; */
 4009 
 4010         if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
 4011             curmsg->event = SPOE_EV_ON_CLIENT_SESS;
 4012         else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
 4013             curmsg->event = SPOE_EV_ON_SERVER_SESS;
 4014 
 4015         else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
 4016             curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
 4017         else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
 4018             curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
 4019         else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
 4020             curmsg->event = SPOE_EV_ON_TCP_RSP;
 4021 
 4022         else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
 4023             curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
 4024         else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
 4025             curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
 4026         else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
 4027             curmsg->event = SPOE_EV_ON_HTTP_RSP;
 4028         else {
 4029             ha_alert("parsing [%s:%d] : unknown event '%s'.\n",
 4030                  file, linenum, args[1]);
 4031             err_code |= ERR_ALERT | ERR_FATAL;
 4032             goto out;
 4033         }
 4034 
 4035         if (strcmp(args[2], "if") == 0 || strcmp(args[2], "unless") == 0) {
 4036             struct acl_cond *cond;
 4037 
 4038             cond = build_acl_cond(file, linenum, &curmsg->acls,
 4039                           curproxy, (const char **)args+2,
 4040                           &errmsg);
 4041             if (cond == NULL) {
 4042                 ha_alert("parsing [%s:%d] : error detected while "
 4043                      "parsing an 'event %s' condition : %s.\n",
 4044                      file, linenum, args[1], errmsg);
 4045                 err_code |= ERR_ALERT | ERR_FATAL;
 4046                 goto out;
 4047             }
 4048             curmsg->cond = cond;
 4049         }
 4050         else if (*args[2]) {
 4051             ha_alert("parsing [%s:%d]: 'event %s' expects either 'if' "
 4052                  "or 'unless' followed by a condition but found '%s'.\n",
 4053                  file, linenum, args[1], args[2]);
 4054             err_code |= ERR_ALERT | ERR_FATAL;
 4055             goto out;
 4056         }
 4057     }
 4058     else if (!*args[0]) {
 4059         ha_alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
 4060              file, linenum, args[0]);
 4061         err_code |= ERR_ALERT | ERR_FATAL;
 4062         goto out;
 4063     }
 4064  out:
 4065     free(errmsg);
 4066     return err_code;
 4067 }
 4068 
 4069 /* Return -1 on error, else 0 */
 4070 static int
 4071 parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
 4072                 struct flt_conf *fconf, char **err, void *private)
 4073 {
 4074     struct list backup_sections;
 4075     struct spoe_config          *conf;
 4076     struct spoe_message         *msg, *msgback;
 4077     struct spoe_group           *grp, *grpback;
 4078     struct spoe_placeholder     *ph, *phback;
 4079     struct spoe_var_placeholder *vph, *vphback;
 4080     struct logsrv               *logsrv, *logsrvback;
 4081     char                        *file = NULL, *engine = NULL;
 4082     int                          ret, pos = *cur_arg + 1;
 4083 
 4084     LIST_INIT(&curmsgs);
 4085     LIST_INIT(&curgrps);
 4086     LIST_INIT(&curmphs);
 4087     LIST_INIT(&curgphs);
 4088     LIST_INIT(&curvars);
 4089     LIST_INIT(&curlogsrvs);
 4090     curpxopts  = 0;
 4091     curpxopts2 = 0;
 4092 
 4093     conf = calloc(1, sizeof(*conf));
 4094     if (conf == NULL) {
 4095         memprintf(err, "%s: out of memory", args[*cur_arg]);
 4096         goto error;
 4097     }
 4098     conf->proxy = px;
 4099 
 4100     while (*args[pos]) {
 4101         if (!strcmp(args[pos], "config")) {
 4102             if (!*args[pos+1]) {
 4103                 memprintf(err, "'%s' : '%s' option without value",
 4104                       args[*cur_arg], args[pos]);
 4105                 goto error;
 4106             }
 4107             file = args[pos+1];
 4108             pos += 2;
 4109         }
 4110         else if (!strcmp(args[pos], "engine")) {
 4111             if (!*args[pos+1]) {
 4112                 memprintf(err, "'%s' : '%s' option without value",
 4113                       args[*cur_arg], args[pos]);
 4114                 goto error;
 4115             }
 4116             engine = args[pos+1];
 4117             pos += 2;
 4118         }
 4119         else {
 4120             memprintf(err, "unknown keyword '%s'", args[pos]);
 4121             goto error;
 4122         }
 4123     }
 4124     if (file == NULL) {
 4125         memprintf(err, "'%s' : missing config file", args[*cur_arg]);
 4126         goto error;
 4127     }
 4128 
 4129     /* backup sections and register SPOE sections */
 4130     LIST_INIT(&backup_sections);
 4131     cfg_backup_sections(&backup_sections);
 4132     cfg_register_section("spoe-agent",   cfg_parse_spoe_agent, NULL);
 4133     cfg_register_section("spoe-group",   cfg_parse_spoe_group, NULL);
 4134     cfg_register_section("spoe-message", cfg_parse_spoe_message, NULL);
 4135 
 4136     /* Parse SPOE filter configuration file */
 4137     curengine = engine;
 4138     curproxy  = px;
 4139     curagent  = NULL;
 4140     curmsg    = NULL;
 4141     ret = readcfgfile(file);
 4142     curproxy = NULL;
 4143 
 4144     /* unregister SPOE sections and restore previous sections */
 4145     cfg_unregister_sections();
 4146     cfg_restore_sections(&backup_sections);
 4147 
 4148     if (ret == -1) {
 4149         memprintf(err, "Could not open configuration file %s : %s",
 4150               file, strerror(errno));
 4151         goto error;
 4152     }
 4153     if (ret & (ERR_ABORT|ERR_FATAL)) {
 4154         memprintf(err, "Error(s) found in configuration file %s", file);
 4155         goto error;
 4156     }
 4157 
 4158     /* Check SPOE agent */
 4159     if (curagent == NULL) {
 4160         memprintf(err, "No SPOE agent found in file %s", file);
 4161         goto error;
 4162     }
 4163     if (curagent->b.name == NULL) {
 4164         memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
 4165               curagent->id, curagent->conf.file, curagent->conf.line);
 4166         goto error;
 4167     }
 4168     if (curagent->timeout.hello      == TICK_ETERNITY ||
 4169         curagent->timeout.idle       == TICK_ETERNITY ||
 4170         curagent->timeout.processing == TICK_ETERNITY) {
 4171         ha_warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
 4172                "   | While not properly invalid, you will certainly encounter various problems\n"
 4173                "   | with such a configuration. To fix this, please ensure that all following\n"
 4174                "   | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
 4175                px->id, curagent->id, curagent->conf.file, curagent->conf.line);
 4176     }
 4177     if (curagent->var_pfx == NULL) {
 4178         char *tmp = curagent->id;
 4179 
 4180         while (*tmp) {
 4181             if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
 4182                 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
 4183                       "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
 4184                       curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
 4185                 goto error;
 4186             }
 4187             tmp++;
 4188         }
 4189         curagent->var_pfx = strdup(curagent->id);
 4190     }
 4191 
 4192     if (curagent->var_on_error) {
 4193         struct arg arg;
 4194 
 4195         trash.data = snprintf(trash.area, trash.size, "txn.%s.%s",
 4196                      curagent->var_pfx, curagent->var_on_error);
 4197 
 4198         arg.type = ARGT_STR;
 4199         arg.data.str.area = trash.area;
 4200         arg.data.str.data = trash.data;
 4201         arg.data.str.size = 0; /* Set it to 0 to not release it in vars_check_args() */
 4202         if (!vars_check_arg(&arg, err)) {
 4203             memprintf(err, "SPOE agent '%s': failed to register variable %s.%s (%s)",
 4204                   curagent->id, curagent->var_pfx, curagent->var_on_error, *err);
 4205             goto error;
 4206         }
 4207     }
 4208 
 4209     if (curagent->var_t_process) {
 4210         struct arg arg;
 4211 
 4212         trash.data = snprintf(trash.area, trash.size, "txn.%s.%s",
 4213                      curagent->var_pfx, curagent->var_t_process);
 4214 
 4215         arg.type = ARGT_STR;
 4216         arg.data.str.area = trash.area;
 4217         arg.data.str.data = trash.data;
 4218         arg.data.str.size = 0;  /* Set it to 0 to not release it in vars_check_args() */
 4219         if (!vars_check_arg(&arg, err)) {
 4220             memprintf(err, "SPOE agent '%s': failed to register variable %s.%s (%s)",
 4221                   curagent->id, curagent->var_pfx, curagent->var_t_process, *err);
 4222             goto error;
 4223         }
 4224     }
 4225 
 4226     if (curagent->var_t_total) {
 4227         struct arg arg;
 4228 
 4229         trash.data = snprintf(trash.area, trash.size, "txn.%s.%s",
 4230                      curagent->var_pfx, curagent->var_t_total);
 4231 
 4232         arg.type = ARGT_STR;
 4233         arg.data.str.area = trash.area;
 4234         arg.data.str.data = trash.data;
 4235         arg.data.str.size = 0;  /* Set it to 0 to not release it in vars_check_args() */
 4236         if (!vars_check_arg(&arg, err)) {
 4237             memprintf(err, "SPOE agent '%s': failed to register variable %s.%s (%s)",
 4238                   curagent->id, curagent->var_pfx, curagent->var_t_process, *err);
 4239             goto error;
 4240         }
 4241     }
 4242 
 4243     if (LIST_ISEMPTY(&curmphs) && LIST_ISEMPTY(&curgphs)) {
 4244         ha_warning("Proxy '%s': No message/group used by SPOE agent '%s' declared at %s:%d.\n",
 4245                px->id, curagent->id, curagent->conf.file, curagent->conf.line);
 4246         goto finish;
 4247     }
 4248 
 4249     /* Replace placeholders by the corresponding messages for the SPOE
 4250      * agent */
 4251     list_for_each_entry(ph, &curmphs, list) {
 4252         list_for_each_entry(msg, &curmsgs, list) {
 4253             struct spoe_arg *arg;
 4254             unsigned int     where;
 4255 
 4256             if (!strcmp(msg->id, ph->id)) {
 4257                 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
 4258                     if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
 4259                         msg->event = SPOE_EV_ON_TCP_REQ_FE;
 4260                     if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
 4261                         msg->event = SPOE_EV_ON_HTTP_REQ_FE;
 4262                 }
 4263                 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
 4264                                    msg->event == SPOE_EV_ON_TCP_REQ_FE ||
 4265                                    msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
 4266                     ha_warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
 4267                            px->id, msg->conf.file, msg->conf.line);
 4268                     goto next_mph;
 4269                 }
 4270                 if (msg->event == SPOE_EV_NONE) {
 4271                     ha_warning("Proxy '%s': Ignore SPOE message '%s' without event at %s:%d.\n",
 4272                            px->id, msg->id, msg->conf.file, msg->conf.line);
 4273                     goto next_mph;
 4274                 }
 4275 
 4276                 where = 0;
 4277                 switch (msg->event) {
 4278                     case SPOE_EV_ON_CLIENT_SESS:
 4279                         where |= SMP_VAL_FE_CON_ACC;
 4280                         break;
 4281 
 4282                     case SPOE_EV_ON_TCP_REQ_FE:
 4283                         where |= SMP_VAL_FE_REQ_CNT;
 4284                         break;
 4285 
 4286                     case SPOE_EV_ON_HTTP_REQ_FE:
 4287                         where |= SMP_VAL_FE_HRQ_HDR;
 4288                         break;
 4289 
 4290                     case SPOE_EV_ON_TCP_REQ_BE:
 4291                         if (px->cap & PR_CAP_FE)
 4292                             where |= SMP_VAL_FE_REQ_CNT;
 4293                         if (px->cap & PR_CAP_BE)
 4294                             where |= SMP_VAL_BE_REQ_CNT;
 4295                         break;
 4296 
 4297                     case SPOE_EV_ON_HTTP_REQ_BE:
 4298                         if (px->cap & PR_CAP_FE)
 4299                             where |= SMP_VAL_FE_HRQ_HDR;
 4300                         if (px->cap & PR_CAP_BE)
 4301                             where |= SMP_VAL_BE_HRQ_HDR;
 4302                         break;
 4303 
 4304                     case SPOE_EV_ON_SERVER_SESS:
 4305                         where |= SMP_VAL_BE_SRV_CON;
 4306                         break;
 4307 
 4308                     case SPOE_EV_ON_TCP_RSP:
 4309                         if (px->cap & PR_CAP_FE)
 4310                             where |= SMP_VAL_FE_RES_CNT;
 4311                         if (px->cap & PR_CAP_BE)
 4312                             where |= SMP_VAL_BE_RES_CNT;
 4313                         break;
 4314 
 4315                     case SPOE_EV_ON_HTTP_RSP:
 4316                         if (px->cap & PR_CAP_FE)
 4317                             where |= SMP_VAL_FE_HRS_HDR;
 4318                         if (px->cap & PR_CAP_BE)
 4319                             where |= SMP_VAL_BE_HRS_HDR;
 4320                         break;
 4321 
 4322                     default:
 4323                         break;
 4324                 }
 4325 
 4326                 list_for_each_entry(arg, &msg->args, list) {
 4327                     if (!(arg->expr->fetch->val & where)) {
 4328                         memprintf(err, "Ignore SPOE message '%s' at %s:%d: "
 4329                             "some args extract information from '%s', "
 4330                             "none of which is available here ('%s')",
 4331                             msg->id, msg->conf.file, msg->conf.line,
 4332                             sample_ckp_names(arg->expr->fetch->use),
 4333                             sample_ckp_names(where));
 4334                         goto error;
 4335                     }
 4336                 }
 4337 
 4338                 msg->agent = curagent;
 4339                 LIST_ADDQ(&curagent->events[msg->event], &msg->by_evt);
 4340                 goto next_mph;
 4341             }
 4342         }
 4343         memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
 4344               curagent->id, ph->id, curagent->conf.file, curagent->conf.line);
 4345         goto error;
 4346       next_mph:
 4347         continue;
 4348     }
 4349 
 4350     /* Replace placeholders by the corresponding groups for the SPOE
 4351      * agent */
 4352     list_for_each_entry(ph, &curgphs, list) {
 4353         list_for_each_entry_safe(grp, grpback, &curgrps, list) {
 4354             if (!strcmp(grp->id, ph->id)) {
 4355                 grp->agent = curagent;
 4356                 LIST_DEL(&grp->list);
 4357                 LIST_ADDQ(&curagent->groups, &grp->list);
 4358                 goto next_aph;
 4359             }
 4360         }
 4361         memprintf(err, "SPOE agent '%s' try to use undefined SPOE group '%s' at %s:%d",
 4362               curagent->id, ph->id, curagent->conf.file, curagent->conf.line);
 4363         goto error;
 4364       next_aph:
 4365         continue;
 4366     }
 4367 
 4368     /* Replace placeholders by the corresponding message for each SPOE
 4369      * group of the SPOE agent */
 4370     list_for_each_entry(grp, &curagent->groups, list) {
 4371         list_for_each_entry_safe(ph, phback, &grp->phs, list) {
 4372             list_for_each_entry(msg, &curmsgs, list) {
 4373                 if (!strcmp(msg->id, ph->id)) {
 4374                     if (msg->group != NULL) {
 4375                         memprintf(err, "SPOE message '%s' already belongs to "
 4376                               "the SPOE group '%s' declare at %s:%d",
 4377                               msg->id, msg->group->id,
 4378                               msg->group->conf.file,
 4379                               msg->group->conf.line);
 4380                         goto error;
 4381                     }
 4382 
 4383                     /* Scope for arguments are not checked for now. We will check
 4384                      * them only if a rule use the corresponding SPOE group. */
 4385                     msg->agent = curagent;
 4386                     msg->group = grp;
 4387                     LIST_DEL(&ph->list);
 4388                     LIST_ADDQ(&grp->messages, &msg->by_grp);
 4389                     goto next_mph_grp;
 4390                 }
 4391             }
 4392             memprintf(err, "SPOE group '%s' try to use undefined SPOE message '%s' at %s:%d",
 4393                   grp->id, ph->id, curagent->conf.file, curagent->conf.line);
 4394             goto error;
 4395           next_mph_grp:
 4396             continue;
 4397         }
 4398     }
 4399 
 4400  finish:
 4401     /* move curmsgs to the agent message list */
 4402     curmsgs.n->p = &curagent->messages;
 4403     curmsgs.p->n = &curagent->messages;
 4404     curagent->messages = curmsgs;
 4405     LIST_INIT(&curmsgs);
 4406 
 4407     conf->id    = strdup(engine ? engine : curagent->id);
 4408     conf->agent = curagent;
 4409 
 4410     /* Start agent's proxy initialization here. It will be finished during
 4411      * the filter init. */
 4412         memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
 4413         init_new_proxy(&conf->agent_fe);
 4414     conf->agent_fe.id        = conf->agent->id;
 4415     conf->agent_fe.parent    = conf->agent;
 4416     conf->agent_fe.options  |= curpxopts;
 4417     conf->agent_fe.options2 |= curpxopts2;
 4418 
 4419     list_for_each_entry_safe(logsrv, logsrvback, &curlogsrvs, list) {
 4420         LIST_DEL(&logsrv->list);
 4421         LIST_ADDQ(&conf->agent_fe.logsrvs, &logsrv->list);
 4422     }
 4423 
 4424     list_for_each_entry_safe(ph, phback, &curmphs, list) {
 4425         LIST_DEL(&ph->list);
 4426         spoe_release_placeholder(ph);
 4427     }
 4428     list_for_each_entry_safe(ph, phback, &curgphs, list) {
 4429         LIST_DEL(&ph->list);
 4430         spoe_release_placeholder(ph);
 4431     }
 4432     list_for_each_entry_safe(vph, vphback, &curvars, list) {
 4433         struct arg arg;
 4434 
 4435         trash.data = snprintf(trash.area, trash.size, "proc.%s.%s",
 4436                      curagent->var_pfx, vph->name);
 4437 
 4438         arg.type = ARGT_STR;
 4439         arg.data.str.area = trash.area;
 4440         arg.data.str.data = trash.data;
 4441         arg.data.str.size = 0;  /* Set it to 0 to not release it in vars_check_args() */
 4442         if (!vars_check_arg(&arg, err)) {
 4443             memprintf(err, "SPOE agent '%s': failed to register variable %s.%s (%s)",
 4444                   curagent->id, curagent->var_pfx, vph->name, *err);
 4445             goto error;
 4446         }
 4447 
 4448         LIST_DEL(&vph->list);
 4449         free(vph->name);
 4450         free(vph);
 4451     }
 4452     list_for_each_entry_safe(grp, grpback, &curgrps, list) {
 4453         LIST_DEL(&grp->list);
 4454         spoe_release_group(grp);
 4455     }
 4456     *cur_arg    = pos;
 4457     fconf->id   = spoe_filter_id;
 4458     fconf->ops  = &spoe_ops;
 4459     fconf->conf = conf;
 4460     return 0;
 4461 
 4462  error:
 4463     spoe_release_agent(curagent);
 4464     list_for_each_entry_safe(ph, phback, &curmphs, list) {
 4465         LIST_DEL(&ph->list);
 4466         spoe_release_placeholder(ph);
 4467     }
 4468     list_for_each_entry_safe(ph, phback, &curgphs, list) {
 4469         LIST_DEL(&ph->list);
 4470         spoe_release_placeholder(ph);
 4471     }
 4472     list_for_each_entry_safe(vph, vphback, &curvars, list) {
 4473         LIST_DEL(&vph->list);
 4474         free(vph->name);
 4475         free(vph);
 4476     }
 4477     list_for_each_entry_safe(grp, grpback, &curgrps, list) {
 4478         LIST_DEL(&grp->list);
 4479         spoe_release_group(grp);
 4480     }
 4481     list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
 4482         LIST_DEL(&msg->list);
 4483         spoe_release_message(msg);
 4484     }
 4485     list_for_each_entry_safe(logsrv, logsrvback, &curlogsrvs, list) {
 4486         LIST_DEL(&logsrv->list);
 4487         free(logsrv);
 4488     }
 4489     free(conf);
 4490     return -1;
 4491 }
 4492 
 4493 /* Send message of a SPOE group. This is the action_ptr callback of a rule
 4494  * associated to a "send-spoe-group" action.
 4495  *
 4496  * It returns ACT_RET_CONT is processing is finished without error, it returns
 4497  * ACT_RET_YIELD if the action is in progress. Otherwise it returns
 4498  * ACT_RET_ERR. */
 4499 static enum act_return
 4500 spoe_send_group(struct act_rule *rule, struct proxy *px,
 4501         struct session *sess, struct stream *s, int flags)
 4502 {
 4503     struct filter      *filter;
 4504     struct spoe_agent   *agent = NULL;
 4505     struct spoe_group   *group = NULL;
 4506     struct spoe_context *ctx   = NULL;
 4507     int ret, dir;
 4508 
 4509     list_for_each_entry(filter, &s->strm_flt.filters, list) {
 4510         if (filter->config == rule->arg.act.p[0]) {
 4511             agent = rule->arg.act.p[2];
 4512             group = rule->arg.act.p[3];
 4513             ctx   = filter->ctx;
 4514             break;
 4515         }
 4516     }
 4517     if (agent == NULL || group == NULL || ctx == NULL)
 4518         return ACT_RET_ERR;
 4519     if (ctx->state == SPOE_CTX_ST_NONE)
 4520         return ACT_RET_CONT;
 4521 
 4522     switch (rule->from) {
 4523         case ACT_F_TCP_REQ_SES: dir = SMP_OPT_DIR_REQ; break;
 4524         case ACT_F_TCP_REQ_CNT: dir = SMP_OPT_DIR_REQ; break;
 4525         case ACT_F_TCP_RES_CNT: dir = SMP_OPT_DIR_RES; break;
 4526         case ACT_F_HTTP_REQ:    dir = SMP_OPT_DIR_REQ; break;
 4527         case ACT_F_HTTP_RES:    dir = SMP_OPT_DIR_RES; break;
 4528         default:
 4529             SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 4530                     " - internal error while execute spoe-send-group\n",
 4531                     (int)now.tv_sec, (int)now.tv_usec, agent->id,
 4532                     __FUNCTION__, s);
 4533             send_log(px, LOG_ERR, "SPOE: [%s] internal error while execute spoe-send-group\n",
 4534                  agent->id);
 4535             return ACT_RET_CONT;
 4536     }
 4537 
 4538     ret = spoe_process_group(s, ctx, group, dir);
 4539     if (ret == 1)
 4540         return ACT_RET_CONT;
 4541     else if (ret == 0) {
 4542         if (flags & ACT_FLAG_FINAL) {
 4543             SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 4544                     " - failed to process group '%s': interrupted by caller\n",
 4545                     (int)now.tv_sec, (int)now.tv_usec,
 4546                     agent->id, __FUNCTION__, s, group->id);
 4547             ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
 4548             spoe_stop_processing(agent, ctx);
 4549             spoe_handle_processing_error(s, agent, ctx, dir);
 4550             return ACT_RET_CONT;
 4551         }
 4552         return ACT_RET_YIELD;
 4553     }
 4554     else
 4555         return ACT_RET_ERR;
 4556 }
 4557 
 4558 /* Check an "send-spoe-group" action. Here, we'll try to find the real SPOE
 4559  * group associated to <rule>. The format of an rule using 'send-spoe-group'
 4560  * action should be:
 4561  *
 4562  *   (http|tcp)-(request|response) send-spoe-group <engine-id> <group-id>
 4563  *
 4564  * So, we'll loop on each configured SPOE filter for the proxy <px> to find the
 4565  * SPOE engine matching <engine-id>. And then, we'll try to find the good group
 4566  * matching <group-id>. Finally, we'll check all messages referenced by the SPOE
 4567  * group.
 4568  *
 4569  * The function returns 1 in success case, otherwise, it returns 0 and err is
 4570  * filled.
 4571  */
 4572 static int
 4573 check_send_spoe_group(struct act_rule *rule, struct proxy *px, char **err)
 4574 {
 4575     struct flt_conf     *fconf;
 4576     struct spoe_config  *conf;
 4577     struct spoe_agent   *agent = NULL;
 4578     struct spoe_group   *group;
 4579     struct spoe_message *msg;
 4580     char                *engine_id = rule->arg.act.p[0];
 4581     char                *group_id  = rule->arg.act.p[1];
 4582     unsigned int         where = 0;
 4583 
 4584     switch (rule->from) {
 4585         case ACT_F_TCP_REQ_SES: where = SMP_VAL_FE_SES_ACC; break;
 4586         case ACT_F_TCP_REQ_CNT: where = SMP_VAL_FE_REQ_CNT; break;
 4587         case ACT_F_TCP_RES_CNT: where = SMP_VAL_BE_RES_CNT; break;
 4588         case ACT_F_HTTP_REQ:    where = SMP_VAL_FE_HRQ_HDR; break;
 4589         case ACT_F_HTTP_RES:    where = SMP_VAL_BE_HRS_HDR; break;
 4590         default:
 4591             memprintf(err,
 4592                   "internal error, unexpected rule->from=%d, please report this bug!",
 4593                   rule->from);
 4594             goto error;
 4595     }
 4596 
 4597     /* Try to find the SPOE engine by checking all SPOE filters for proxy
 4598      * <px> */
 4599     list_for_each_entry(fconf, &px->filter_configs, list) {
 4600         conf = fconf->conf;
 4601 
 4602         /* This is not an SPOE filter */
 4603         if (fconf->id != spoe_filter_id)
 4604             continue;
 4605 
 4606         /* This is the good engine */
 4607         if (!strcmp(conf->id, engine_id)) {
 4608             agent = conf->agent;
 4609             break;
 4610         }
 4611     }
 4612     if (agent == NULL) {
 4613         memprintf(err, "unable to find SPOE engine '%s' used by the send-spoe-group '%s'",
 4614               engine_id, group_id);
 4615         goto error;
 4616     }
 4617 
 4618     /* Try to find the right group */
 4619     list_for_each_entry(group, &agent->groups, list) {
 4620         /* This is the good group */
 4621         if (!strcmp(group->id, group_id))
 4622             break;
 4623     }
 4624     if (&group->list == &agent->groups) {
 4625         memprintf(err, "unable to find SPOE group '%s' into SPOE engine '%s' configuration",
 4626               group_id, engine_id);
 4627         goto error;
 4628     }
 4629 
 4630     /* Ok, we found the group, we need to check messages and their
 4631      * arguments */
 4632     list_for_each_entry(msg, &group->messages, by_grp) {
 4633         struct spoe_arg *arg;
 4634 
 4635         list_for_each_entry(arg, &msg->args, list) {
 4636             if (!(arg->expr->fetch->val & where)) {
 4637                 memprintf(err, "Invalid SPOE message '%s' used by SPOE group '%s' at %s:%d: "
 4638                       "some args extract information from '%s',"
 4639                       "none of which is available here ('%s')",
 4640                       msg->id, group->id, msg->conf.file, msg->conf.line,
 4641                       sample_ckp_names(arg->expr->fetch->use),
 4642                       sample_ckp_names(where));
 4643                 goto error;
 4644             }
 4645         }
 4646     }
 4647 
 4648     free(engine_id);
 4649     free(group_id);
 4650     rule->arg.act.p[0] = fconf; /* Associate filter config with the rule */
 4651     rule->arg.act.p[1] = conf;  /* Associate SPOE config with the rule */
 4652     rule->arg.act.p[2] = agent; /* Associate SPOE agent with the rule */
 4653     rule->arg.act.p[3] = group; /* Associate SPOE group with the rule */
 4654     return 1;
 4655 
 4656   error:
 4657     free(engine_id);
 4658     free(group_id);
 4659     return 0;
 4660 }
 4661 
 4662 /* Parse 'send-spoe-group' action following the format:
 4663  *
 4664  *     ... send-spoe-group <engine-id> <group-id>
 4665  *
 4666  * It returns ACT_RET_PRS_ERR if fails and <err> is filled with an error
 4667  * message. Otherwise, it returns ACT_RET_PRS_OK and parsing engine and group
 4668  * ids are saved and used later, when the rule will be checked.
 4669  */
 4670 static enum act_parse_ret
 4671 parse_send_spoe_group(const char **args, int *orig_arg, struct proxy *px,
 4672               struct act_rule *rule, char **err)
 4673 {
 4674     if (!*args[*orig_arg] || !*args[*orig_arg+1] ||
 4675         (*args[*orig_arg+2] && strcmp(args[*orig_arg+2], "if") != 0 && strcmp(args[*orig_arg+2], "unless") != 0)) {
 4676         memprintf(err, "expects 2 arguments: <engine-id> <group-id>");
 4677         return ACT_RET_PRS_ERR;
 4678     }
 4679     rule->arg.act.p[0] = strdup(args[*orig_arg]);   /* Copy the SPOE engine id */
 4680     rule->arg.act.p[1] = strdup(args[*orig_arg+1]); /* Cope the SPOE group id */
 4681 
 4682     (*orig_arg) += 2;
 4683 
 4684     rule->action     = ACT_CUSTOM;
 4685     rule->action_ptr = spoe_send_group;
 4686     rule->check_ptr  = check_send_spoe_group;
 4687     return ACT_RET_PRS_OK;
 4688 }
 4689 
 4690 
 4691 /* Declare the filter parser for "spoe" keyword */
 4692 static struct flt_kw_list flt_kws = { "SPOE", { }, {
 4693         { "spoe", parse_spoe_flt, NULL },
 4694         { NULL, NULL, NULL },
 4695     }
 4696 };
 4697 
 4698 INITCALL1(STG_REGISTER, flt_register_keywords, &flt_kws);
 4699 
 4700 /* Delcate the action parser for "spoe-action" keyword */
 4701 static struct action_kw_list tcp_req_action_kws = { { }, {
 4702         { "send-spoe-group", parse_send_spoe_group },
 4703         { /* END */ },
 4704     }
 4705 };
 4706 
 4707 INITCALL1(STG_REGISTER, tcp_req_cont_keywords_register, &tcp_req_action_kws);
 4708 
 4709 static struct action_kw_list tcp_res_action_kws = { { }, {
 4710         { "send-spoe-group", parse_send_spoe_group },
 4711         { /* END */ },
 4712     }
 4713 };
 4714 
 4715 INITCALL1(STG_REGISTER, tcp_res_cont_keywords_register, &tcp_res_action_kws);
 4716 
 4717 static struct action_kw_list http_req_action_kws = { { }, {
 4718         { "send-spoe-group", parse_send_spoe_group },
 4719         { /* END */ },
 4720     }
 4721 };
 4722 
 4723 INITCALL1(STG_REGISTER, http_req_keywords_register, &http_req_action_kws);
 4724 
 4725 static struct action_kw_list http_res_action_kws = { { }, {
 4726         { "send-spoe-group", parse_send_spoe_group },
 4727         { /* END */ },
 4728     }
 4729 };
 4730 
 4731 INITCALL1(STG_REGISTER, http_res_keywords_register, &http_res_action_kws);