"Fossies" - the Fresh Open Source Software Archive

Member "Pound-3.0.2/src/http2.c" (28 Nov 2021, 29521 Bytes) of package /linux/www/Pound-3.0.2.tgz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "http2.c" see the Fossies "Dox" file reference documentation.

    1 /*
    2  * Pound - the reverse-proxy load-balancer
    3  * Copyright (C) 2002-2020 Apsis GmbH
    4  *
    5  * This file is part of Pound.
    6  *
    7  * Pound is free software; you can redistribute it and/or modify
    8  * it under the terms of the GNU General Public License as published by
    9  * the Free Software Foundation; either version 3 of the License, or
   10  * (at your option) any later version.
   11  *
   12  * Pound is distributed in the hope that it will be useful,
   13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
   14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   15  * GNU General Public License for more details.
   16  *
   17  * You should have received a copy of the GNU General Public License
   18  * along with this program.  If not, see <http://www.gnu.org/licenses/> .
   19  *
   20  * Contact information:
   21  * Apsis GmbH
   22  * P.O.Box
   23  * 8707 Uetikon am See
   24  * Switzerland
   25  * EMail: roseg@apsis.ch
   26  */
   27 
   28 #include    "pound.h"
   29 
   30 /* frame types */
   31 #define F_DATA      0x0
   32 #define F_HEADERS   0x1
   33 #define F_PRIORITY  0x2
   34 #define F_RST       0x3
   35 #define F_SETTINGS  0x4
   36 #define F_PUSH      0x5
   37 #define F_PING      0x6
   38 #define F_GOAWAY    0x7
   39 #define F_WINUPD    0x8
   40 #define F_CONT      0x9
   41 
   42 /* error flags */
   43 #define E_NO_ERROR              0x0
   44 #define E_PROTOCOL_ERROR        0x1
   45 #define E_INTERNAL_ERROR        0x2
   46 #define E_FLOW_CONTROL_ERROR    0x3
   47 #define E_SETTINGS_TIMEOUT      0x4
   48 #define E_STREAM_CLOSED         0x5
   49 #define E_FRAME_SIZE_ERROR      0x6
   50 #define E_REFUSED_STREAM        0x7
   51 #define E_CANCEL                0x8
   52 #define E_COMPRESSION_ERROR     0x9
   53 #define E_CONNECT_ERROR         0xa
   54 #define E_ENHANCE_YOUR_CALM     0xb
   55 #define E_INADEQUATE_SECURITY   0xc
   56 #define E_HTTP_1_1_REQUIRED     0xd
   57 
   58 /* settings types */
   59 #define S_TABSIZE       0x1
   60 #define S_MAXSTREAMS    0x3
   61 #define S_MAXFRAME      0x5
   62 
   63 typedef struct {
   64     int             length;
   65     unsigned char   type;
   66     unsigned char   flags;
   67     unsigned int    stream_id;
   68 } FRAME;
   69 
   70 typedef struct {
   71     unsigned int    id;
   72     unsigned int    val;
   73 } SETTINGS;
   74 
   75 typedef struct {
   76     int             stream_id;
   77     UT_hash_handle  hh;
   78 } CLOSED_STREAM;
   79 
   80 typedef enum { HEADERS, DATA, TRAILERS, REPLY }   STREAM_STATE;
   81 
   82 typedef struct {
   83     int                         stream_id;
   84     STREAM_STATE                state;
   85     int                         at_eos;
   86     struct hpack_table          *h_tab;
   87     struct hpack_headerblock    *h_headers, *h_trailers;
   88     int                         s_be;
   89     UT_hash_handle              hh;
   90 } ACTIVE_STREAM;
   91 
   92 static int
   93 read_int(FILE *f, int n, jmp_buf *jmp_err)
   94 {
   95     int cin, res;
   96 
   97     for(res = 0; n > 0; n--) {
   98         if((cin = getc(f)) == EOF)
   99             longjmp(*jmp_err, 1);
  100         res = (res << 8) | (cin & 0xFF);
  101     }
  102     return res;
  103 }
  104 
  105 static void
  106 write_int(FILE *f, int val, int n, jmp_buf *jmp_err)
  107 {
  108     int byte;
  109 
  110     while(n > 0) {
  111         byte = (val >> ((n - 1) * 8)) & 0xFF;
  112         if(putc(byte, f) == EOF)
  113             longjmp(*jmp_err, 1);
  114         n--;
  115     }
  116     return;
  117 }
  118 
  119 #define get_frame(FH, F, E)     { memset((FH), '\0', sizeof(FRAME)); (FH)->length = read_int((F), 3, (E)); (FH)->type = read_int((F), 1, (E)); (FH)->flags = read_int((F), 1, (E)); (FH)->stream_id = read_int((F), 4, (E)); }
  120 #define put_frame(FH, F, E)     { write_int((F), (FH)->length, 3, (E)); write_int((F), (FH)->type, 1, (E)); write_int((F), (FH)->flags, 1, (E)); write_int((F), (FH)->stream_id, 4, (E)); }
  121 #define get_settings(S, F, E)   { memset((S), '\0', sizeof(SETTINGS)); (S)->id = read_int((F), 2, (E)); (S)->val = read_int((F), 4, (E)); }
  122 #define put_settings(S, F, E)   { write_int((F), (S)->id, 2, (E)); write_int((F), (S)->val, 4, (E)); }
  123 #define get_rst(C, F, E)        { *(C) = read_int((F), 4, (E)); }
  124 #define put_rst(C, F, E)        { write_int((F), *(C), 4, (E)); }
  125 
  126 static void
  127 close_be(int s_be)
  128 {
  129     struct timespec t_wait;
  130 
  131     if(s_be < 0)
  132         return;
  133     nn_send(s_be, "", 0, 0);
  134     /* sleep to make sure all messages are through before closing the channel */
  135     t_wait.tv_sec = 0;
  136     t_wait.tv_nsec = 1000000;
  137     nanosleep(&t_wait, NULL);
  138     nn_close(s_be);
  139     return;
  140 }
  141 
  142 static int
  143 put_msg(FILE *f_client, int stream_id, char *reason, char *body, int TABSIZE, int FRAMESIZE, jmp_buf *jmp_err)
  144 {
  145     FRAME   rep;
  146     unsigned char   *content;
  147     struct hpack_table *tab;
  148     struct hpack_headerblock *headers;
  149     int total;
  150     size_t  length;
  151 
  152     memset(&rep, '\0', sizeof(rep));
  153     rep.stream_id = stream_id;
  154     rep.type = F_HEADERS;
  155     rep.flags = 0x04;
  156     if(body == NULL)
  157         rep.flags |= 0x01;
  158     headers = hpack_headerblock_new();
  159     if(hpack_header_add(headers, ":status", reason, HPACK_INDEX) == NULL) {
  160         hpack_headerblock_free(headers);
  161         return 0;
  162     }
  163     tab = hpack_table_new(TABSIZE);
  164     if((content = hpack_encode(headers, &length, tab)) == NULL) {
  165         hpack_table_free(tab);
  166         hpack_headerblock_free(headers);
  167         return 0;
  168     }
  169     rep.length = length;
  170     put_frame(&rep, f_client, jmp_err);
  171     fwrite(content, 1, rep.length, f_client);
  172     total = 9 + rep.length;
  173     hpack_table_free(tab);
  174     hpack_headerblock_free(headers);
  175     free(content);
  176     if(body != NULL) {
  177         memset(&rep, '\0', sizeof(rep));
  178         rep.stream_id = stream_id + 1;
  179         rep.type = F_DATA;
  180         rep.flags = 0x01;
  181         rep.length = strlen(body);
  182         put_frame(&rep, f_client, jmp_err);
  183         fwrite(body, 1, rep.length, f_client);
  184         total += 9 + rep.length;
  185     }
  186 
  187     return total;
  188 }
  189 
  190 static void
  191 put_reject(FILE *f_client, int stream_id, int code, jmp_buf *jmp_err)
  192 {
  193     FRAME   rep;
  194 
  195     rep.stream_id = stream_id;
  196     rep.length = 0;
  197     rep.flags = 0;
  198     rep.type = F_RST;
  199     put_frame(&rep, f_client, jmp_err);
  200     write_int(f_client, code, 4, jmp_err);
  201     return;
  202 }
  203 
  204 static ACTIVE_STREAM *
  205 add_active(ACTIVE_STREAM **as, int stream_id, int TABSIZE)
  206 {
  207     ACTIVE_STREAM   *s;
  208 
  209     if((s = malloc(sizeof(ACTIVE_STREAM))) == NULL)
  210         return NULL;
  211     s->stream_id = stream_id;
  212     s->h_headers = s->h_trailers = NULL;
  213     s->h_tab = hpack_table_new(TABSIZE);
  214     s->s_be = -1;
  215     s->state = HEADERS;
  216     s->at_eos = 0;
  217     HASH_ADD_INT(*as, stream_id, s);
  218     logmsg(3, "%lX added %d to active %s:%d", pthread_self(), stream_id, __FILE__, __LINE__);
  219     return s;
  220 }
  221 
  222 static void
  223 add_closed(CLOSED_STREAM **cs, int stream_id)
  224 {
  225     CLOSED_STREAM   *s;
  226 
  227     if((s = malloc(sizeof(CLOSED_STREAM))) == NULL)
  228         return;
  229     s->stream_id = stream_id;
  230     HASH_ADD_INT(*cs, stream_id, s);
  231     logmsg(3, "%lX added %d to closed %s:%d", pthread_self(), stream_id, __FILE__, __LINE__);
  232     return;
  233 }
  234 
  235 static void
  236 del_active(ACTIVE_STREAM **active_streams, CLOSED_STREAM **closed_streams, int stream_id)
  237 {
  238     ACTIVE_STREAM   *s;
  239 
  240     HASH_FIND_INT(*active_streams, &stream_id, s);
  241     if(s != NULL) {
  242         if(s->h_headers != NULL)
  243             hpack_headerblock_free(s->h_headers);
  244         if(s->h_trailers != NULL)
  245             hpack_headerblock_free(s->h_trailers);
  246         hpack_table_free(s->h_tab);
  247         close_be(s->s_be);
  248         HASH_DEL(*active_streams, s);
  249         logmsg(3, "%lX removed %d from active %s:%d", pthread_self(), stream_id, __FILE__, __LINE__);
  250         free(s);
  251     }
  252     add_closed(closed_streams, stream_id);
  253     return;
  254 }
  255 
  256 #define cleanup()   {\
  257     if(active_streams != NULL) \
  258         HASH_ITER(hh, active_streams, as, tas) { \
  259             hpack_headerblock_free(as->h_headers); \
  260             hpack_table_free(as->h_tab); \
  261             close_be(as->s_be); \
  262             HASH_DEL(active_streams, as); \
  263             free(as); \
  264         } \
  265     if(closed_streams != NULL) \
  266         HASH_ITER(hh, closed_streams, cs, tcs) { \
  267             HASH_DEL(closed_streams, cs); \
  268             free(cs); \
  269         } \
  270     if(content != NULL) \
  271         free(content); \
  272 }
  273 
  274 void
  275 do_http2(HTTP_LISTENER *http, FILE *f_client, char *peer_name, char *crt_buf, int upgrade_h2)
  276 {
  277     FRAME           header;
  278     SETTINGS        settings;
  279     unsigned char   *content, *buf;
  280     char            *msg;
  281     int             n, err, last_stream, v1, v2, h_size;
  282     int             TABSIZE, FRAMESIZE, MAXSTREAMS;
  283     jmp_buf         err_jmp;
  284     ACTIVE_STREAM   *active_streams, *as, *tas;
  285     CLOSED_STREAM   *closed_streams, *cs, *tcs;
  286     struct hpack_headerblock    *h_cont;
  287     struct timespec t_wait;
  288 
  289     TABSIZE = H2TABSIZE;
  290     FRAMESIZE = H2FRAMESIZE;
  291     MAXSTREAMS = 0;
  292     active_streams = NULL;
  293     closed_streams = NULL;
  294     content = NULL;
  295 
  296     logmsg(1, "%lX start do_http2 %s:%d", pthread_self(), __FILE__, __LINE__);
  297     if(setjmp(err_jmp)) {
  298         logmsg(0, "%lX Unexpected error %s:%d", pthread_self(), __FILE__, __LINE__);
  299         cleanup();
  300         return;
  301     }
  302 
  303     get_frame(&header, f_client, &err_jmp);
  304     if(header.type == F_SETTINGS && !(header.flags & 0x01)) {
  305         for(n = header.length; n > 0; n -= 6) {
  306             get_settings(&settings, f_client, &err_jmp);
  307             if(settings.id == S_TABSIZE)
  308                 TABSIZE = settings.val;
  309             else if(settings.id == S_MAXFRAME)
  310                 FRAMESIZE = settings.val;
  311             else if(settings.id == S_MAXSTREAMS)
  312                 MAXSTREAMS = settings.val;
  313             else
  314                 logmsg(4, "%lX ignored settings %d/%d %s:%d", pthread_self(), settings.id, settings.val, __FILE__, __LINE__);
  315         }
  316     }
  317     logmsg(3, "%lX TABSIZE %d FRAMESIZE %d MAXSTREAMS %d %s:%d", pthread_self(), TABSIZE, FRAMESIZE, MAXSTREAMS, __FILE__, __LINE__);
  318     if(MAXSTREAMS <= 0)
  319         MAXSTREAMS = 4;
  320     if((content = malloc(FRAMESIZE)) == NULL) {
  321         logmsg(0, "%lX HTTP2 content: out of memory %s:%d", pthread_self(), __FILE__, __LINE__);
  322         return;
  323     }
  324 
  325     /* my settings - same as client */
  326     header.length = 18;
  327     header.type = 0x04;
  328     header.flags = 0;
  329     header.stream_id = 0;
  330     put_frame(&header, f_client, &err_jmp);
  331     settings.id = S_TABSIZE;
  332     settings.val = TABSIZE;
  333     put_settings(&settings, f_client, &err_jmp);
  334     settings.id = S_MAXFRAME;
  335     settings.val = FRAMESIZE;
  336     put_settings(&settings, f_client, &err_jmp);
  337     settings.id = S_MAXSTREAMS;
  338     settings.val = MAXSTREAMS;
  339     put_settings(&settings, f_client, &err_jmp);
  340     logmsg(3, "%lX sent my SETTINGS %s:%d", pthread_self(), __FILE__, __LINE__);
  341 
  342     /* ACK */
  343     header.length = 0;
  344     header.type = F_SETTINGS;
  345     header.flags = 0x01;
  346     header.stream_id = 0;
  347     put_frame(&header, f_client, &err_jmp);
  348     logmsg(3, "%lX ACK SETTINGS %s:%d", pthread_self(), __FILE__, __LINE__);
  349 
  350     /* close stream 1 if needed - only for plain upgraded connections */
  351     if(upgrade_h2) {
  352         if((cs = malloc(sizeof(CLOSED_STREAM))) == NULL) {
  353             logmsg(2, "%lX http2: out of memory for closed streams %s:%d", pthread_self(), __FILE__, __LINE__);
  354             cleanup();
  355             return;
  356         }
  357         cs->stream_id = 1;
  358         HASH_ADD_INT(closed_streams, stream_id, cs);
  359         put_reject(f_client, cs->stream_id, E_REFUSED_STREAM, &err_jmp);
  360         logmsg(3, "%lX REJECT stream 1 %s:%d", pthread_self(), __FILE__, __LINE__);
  361     }
  362 
  363     /* Frames we care about:
  364         HEADERS
  365         CONTINUATION
  366         DATA
  367         RST_STREAM
  368         SETTINGS        ==> only changes in the table size
  369         PING
  370         GOAWAY
  371 
  372         Frames we must react to but otherwise ignore:
  373         PUSH_PROMISE    ==> RST_STREAM, set as closed, ignore further frames
  374 
  375         Frames we can ignore:
  376         WINDOW_UPDATE
  377         PRIORITY
  378 
  379         Frames we send:
  380         HEADERS
  381         CONTINUATION
  382         DATA
  383         GOAWAY
  384     */
  385     for(;;) {
  386         get_frame(&header, f_client, &err_jmp);
  387         switch(header.type) {
  388         case F_HEADERS:
  389             logmsg(4, "%lX received HEADER %d length %d flags %x %s:%d", pthread_self(), header.stream_id, header.length, header.flags, __FILE__, __LINE__);
  390             HASH_FIND_INT(closed_streams, &header.stream_id, cs);
  391             if(cs != NULL) {
  392                 /* header for a closed stream -> RST/PRTOCOL_ERROR */
  393                 put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  394                 logmsg(2, "%lX stream %d already closed %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  395                 continue;
  396             }
  397             HASH_FIND_INT(active_streams, &header.stream_id, as);
  398             if(as == NULL) {
  399                 if((as = add_active(&active_streams, header.stream_id, TABSIZE)) == NULL) {
  400                     logmsg(2, "%lX stream %d failed to add to active %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  401                     put_msg(f_client, header.stream_id + 1, "500", global.err500, TABSIZE, FRAMESIZE, &err_jmp);
  402                     add_closed(&closed_streams, header.stream_id);
  403                     continue;
  404                 }
  405             } else {
  406                 if(as->state == DATA) {
  407                     as->state = TRAILERS;
  408                 } else {
  409                     /* second HEADER frame for an already open stream -> RST/PRTOCOL_ERROR */
  410                     logmsg(2, "%lX stream %d repeated HEADERS %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  411                     del_active(&active_streams, &closed_streams, header.stream_id);
  412                     put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  413                     continue;
  414                 }
  415             }
  416             if(header.flags & 0x08) {
  417                 /* padding */
  418                 v1 = read_int(f_client, 1, &err_jmp);
  419                 v2 = 1;
  420             } else
  421                 v1 = v2 = 0;
  422             if(header.flags & 0x20) {
  423                 /* stream dependency - ignored */
  424                 n = read_int(f_client, 4, &err_jmp);
  425                 logmsg(4, "%lX %d depends on %d %s:%d", pthread_self(), header.stream_id, n, __FILE__, __LINE__);
  426                 /* weight - ignored */
  427                 (void)read_int(f_client, 1, &err_jmp);
  428                 v2 += 5;
  429             }
  430             logmsg(4, "%lX length %d v1 %d v2 %d %s:%d", pthread_self(), header.length, v1, v2, __FILE__, __LINE__);
  431             if((buf = malloc(header.length - v1 - v2)) == NULL) {
  432                 logmsg(2, "%lX stream %d out of memory %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  433                 del_active(&active_streams, &closed_streams, header.stream_id);
  434                 put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  435                 continue;
  436             }
  437             h_size = header.length - v1 - v2;
  438             if(fread(buf, 1, h_size, f_client) != h_size) {
  439                 logmsg(2, "%lX stream %d failed to read content %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  440                 put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  441                 free(buf);
  442                 cleanup();
  443                 return;
  444             }
  445             if(v1 > 0)
  446                 fread(content, 1, v1, f_client);
  447             as->at_eos = (header.flags & 0x01);
  448             if(as->state == TRAILERS && !as->at_eos) {
  449                 logmsg(2, "%lX stream %d trailers but nor end-of-stream %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  450                 put_reject(f_client, as->stream_id, E_PROTOCOL_ERROR, &err_jmp);
  451                 del_active(&active_streams, &closed_streams, as->stream_id);
  452                 free(buf);
  453                 continue;
  454             }
  455             while(!(header.flags & 0x04)) {
  456                 get_frame(&header, f_client, &err_jmp);
  457                 if(header.type != F_CONT || header.stream_id != as->stream_id) {
  458                     logmsg(2, "%lX stream %d HEADER not followed by CONTINUATION %s:%d", pthread_self(), as->stream_id, __FILE__, __LINE__);
  459                     put_reject(f_client, as->stream_id, E_PROTOCOL_ERROR, &err_jmp);
  460                     if(header.stream_id != as->stream_id) {
  461                         put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  462                         del_active(&active_streams, &closed_streams, header.stream_id);
  463                     }
  464                     del_active(&active_streams, &closed_streams, as->stream_id);
  465                     free(buf);
  466                     buf = NULL;
  467                     break;
  468                 }
  469                 if((buf = realloc(buf, h_size + header.length)) == NULL) {
  470                     logmsg(2, "%lX stream %d out of memory %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  471                     put_reject(f_client, as->stream_id, E_PROTOCOL_ERROR, &err_jmp);
  472                     del_active(&active_streams, &closed_streams, as->stream_id);
  473                     break;
  474                 }
  475                 if(fread(buf + h_size, 1, header.length, f_client) != header.length) {
  476                     logmsg(2, "%lX stream %d failed to read content %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  477                     put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  478                     free(buf);
  479                     cleanup();
  480                     return;
  481                 }
  482                 h_size += header.length;
  483             }
  484             if(as->state == HEADERS) {
  485                 if((as->h_headers = hpack_decode(buf, h_size, as->h_tab)) == NULL) {
  486                     logmsg(2, "%lX stream %d failed to decode %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  487                     put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  488                     free(buf);
  489                     cleanup();
  490                     return;
  491                 }
  492             } else {
  493                 if((as->h_trailers = hpack_decode(buf, h_size, as->h_tab)) == NULL) {
  494                     logmsg(2, "%lX stream %d failed to decode %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  495                     put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  496                     free(buf);
  497                     cleanup();
  498                     return;
  499                 }
  500             }
  501             free(buf);
  502             if(as->state == HEADERS) {
  503                 if((as->s_be = get_be(http, peer_name, NULL, NULL, as->h_headers)) < 0) {
  504                     logmsg(2, "%lX stream %d no back-end %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  505                     put_msg(f_client, header.stream_id + 1, "500", global.err500, TABSIZE, FRAMESIZE, &err_jmp);
  506                     del_active(&active_streams, &closed_streams, header.stream_id);
  507                     continue;
  508                 }
  509                 if(crt_buf[0])
  510                     hpack_header_add(as->h_headers, "X-Pound-Cert", crt_buf, HPACK_INDEX);
  511                 v1 = 2;
  512                 nn_send(as->s_be, &v1, sizeof(int), 0);
  513                 nn_send(as->s_be, peer_name, strlen(peer_name) + 1, 0);
  514                 nn_send(as->s_be, &FRAMESIZE, sizeof(int), 0);
  515                 nn_send(as->s_be, &TABSIZE, sizeof(int), 0);
  516                 nn_send(as->s_be, &as->h_headers, sizeof(as->h_headers), 0);
  517                 if(as->at_eos) {
  518                     nn_send(as->s_be, "", 0, 0);    /* no content */
  519                     nn_send(as->s_be, "", 0, 0);    /* no trailers */
  520                     as->state = REPLY;
  521                 } else
  522                     as->state = DATA;
  523             } else {
  524                 nn_send(as->s_be, &as->h_trailers, sizeof(as->h_trailers), 0);
  525                 as->state = REPLY;
  526             }
  527             break;
  528         case F_CONT:
  529             logmsg(4, "%lX received CONT not following HEAD %d %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  530             put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  531             del_active(&active_streams, &closed_streams, header.stream_id);
  532             break;
  533         case F_DATA:
  534             logmsg(4, "%lX received DATA %d %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  535             HASH_FIND_INT(active_streams, &header.stream_id, as);
  536             if(as == NULL || as->state != DATA) {
  537                 /* DATA without previous HEADER/CONT frame without END -> RST/PRTOCOL_ERROR */
  538                 logmsg(0, "http2: CONT without previous HEADERS frame or after END");
  539                 del_active(&active_streams, &closed_streams, header.stream_id);
  540                 put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  541                 continue;
  542             }
  543             if(header.flags & 0x08) {
  544                 /* padding */
  545                 v1 = read_int(f_client, 1, &err_jmp);
  546                 v2 = 1;
  547             } else
  548                 v1 = v2 = 0;
  549             if(fread(content, 1, header.length - v2, f_client) != header.length) {
  550                 put_reject(f_client, header.stream_id, E_PROTOCOL_ERROR, &err_jmp);
  551                 del_active(&active_streams, &closed_streams, header.stream_id);
  552                 logmsg(0, "%lX http2: premature EOF %s:%d", pthread_self(), __FILE__, __LINE__);
  553                 return;
  554             }
  555             nn_send(as->s_be, content, header.length - v2 - v1, 0);
  556             if(v1 > 0)
  557                 fread(content, 1, v1, f_client);
  558             if(as->at_eos = (header.flags & 0x01)) {
  559                 nn_send(as->s_be, "", 0, 0);    /* no trailers */
  560                 as->state = REPLY;
  561             }
  562             break;
  563         case F_RST:
  564             logmsg(4, "%lX received RST %d %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  565             break;
  566         case F_SETTINGS:
  567             logmsg(4, "%lX received SETTINGS %d %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  568             if(header.flags & 0x1) {    /* ACK for what we sent */
  569                 logmsg(4, "%lX received ACK %d type %d length %d %s:%d", pthread_self(), header.stream_id, header.type, header.flags, __FILE__, __LINE__);
  570                 continue;
  571             }
  572             if(header.stream_id != 0) {
  573                 logmsg(4, "%lX non-zero stream_id %d %d %s:%d", pthread_self(), __FILE__, __LINE__);
  574                 last_stream = header.stream_id;
  575                 header.stream_id = 0;
  576                 header.type = F_GOAWAY;
  577                 header.flags = 0;
  578                 header.length = 8;
  579                 put_frame(&header, f_client, &err_jmp);
  580                 write_int(f_client, last_stream, 4, &err_jmp);
  581                 err = 0x1; /*protocol error */
  582                 write_int(f_client, err, 4, &err_jmp);
  583                 cleanup();
  584                 return;
  585             }
  586             for(n = header.length; n > 0; n -= 6) {
  587                 get_settings(&settings, f_client, &err_jmp);
  588                 if(settings.id == S_TABSIZE)
  589                     TABSIZE = settings.val;
  590                 else if(settings.id == S_MAXFRAME) {
  591                     FRAMESIZE = settings.val;
  592                     if((content = realloc(content, FRAMESIZE)) == NULL) {
  593                         logmsg(0, "HTTP2: Out of memory");
  594                         cleanup();
  595                         return;
  596                     }
  597                 } else if(settings.id == S_MAXSTREAMS)
  598                     MAXSTREAMS = settings.val;
  599                 else
  600                     logmsg(4, "%lX ignored settings %d/%d %s:%d", pthread_self(), settings.id, settings.val, __FILE__, __LINE__);
  601             }
  602             logmsg(3, "%lX TABSIZE %d FRAMESIZE %d MAXSTREAMS %d %s:%d", pthread_self(), TABSIZE, FRAMESIZE, MAXSTREAMS, __FILE__, __LINE__);
  603             /* ACK */
  604             header.length = 0;
  605             header.type = 0x04;
  606             header.flags = 0x01;
  607             header.stream_id = 0;
  608             put_frame(&header, f_client, &err_jmp);
  609             logmsg(3, "%lX ACK SETTINGS %s:%d", pthread_self(), __FILE__, __LINE__);
  610             break;
  611         case F_PING:
  612             logmsg(4, "%lX received PING %d %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  613             if(header.stream_id != 0 || header.length != 8) {
  614                 last_stream = header.stream_id;
  615                 header.stream_id = 0;
  616                 header.type = F_GOAWAY;
  617                 header.flags = 0;
  618                 header.length = 8;
  619                 put_frame(&header, f_client, &err_jmp);
  620                 write_int(f_client, last_stream, 4, &err_jmp);
  621                 err = 0x1; /*protocol error */
  622                 write_int(f_client, err, 4, &err_jmp);
  623                 cleanup();
  624                 return;
  625             }
  626             v1 = read_int(f_client, 4, &err_jmp);
  627             v2 = read_int(f_client, 4, &err_jmp);
  628             header.flags = 0x1; /* ACK */
  629             put_frame(&header, f_client, &err_jmp);
  630             write_int(f_client, v1, 4, &err_jmp);
  631             write_int(f_client, v2, 4, &err_jmp);
  632             break;
  633         case F_GOAWAY:
  634             logmsg(4, "%lX received GOAWAY %d %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  635             /* should really send a GOAWAY back */
  636             cleanup();
  637             return;
  638             // break;
  639         /* rejected - no push promise */
  640         case F_PUSH:
  641             logmsg(4, "%lX received PUSH %d %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  642             add_closed(&closed_streams, header.stream_id);
  643             put_reject(f_client, header.stream_id, E_REFUSED_STREAM, &err_jmp);
  644             break;
  645         /* acknowledged but otherwise ignored */
  646         case F_PRIORITY:
  647             logmsg(4, "%lX received PRIORITY %d length %d %s:%d", pthread_self(), header.stream_id, header.length, __FILE__, __LINE__);
  648             if(header.length != 5 || header.stream_id == 0) {
  649                 cleanup();
  650                 return;
  651             }
  652             v1 = read_int(f_client, 4, &err_jmp);
  653             v2 = read_int(f_client, 1, &err_jmp);
  654             logmsg(4, "%lX PRIORITY %d stream %d weight %d %s:%d", pthread_self(), header.stream_id, v1 & 0x7FFFFFFF, v2, __FILE__, __LINE__);
  655             if(header.stream_id == 0)
  656                 put_reject(f_client, 0, E_PROTOCOL_ERROR, &err_jmp);
  657             break;
  658         case F_WINUPD:
  659             logmsg(4, "%lX received WINUPD %d %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  660             if(header.length != 4) {
  661                 cleanup();
  662                 return;
  663             }
  664             v1 = read_int(f_client, 4, &err_jmp);
  665             break;
  666         default:
  667             logmsg(4, "%lX received UNKNOWN %d stream_id %d length %d flags %d %s:%d", pthread_self(), header.type, header.stream_id, header.length, header.flags, __FILE__, __LINE__);
  668             cleanup();
  669             return;
  670         }
  671         HASH_ITER(hh, active_streams, as, tas) {
  672             if(as->state == REPLY) {
  673                 for(v1 = 1; (header.length = nn_recv(as->s_be, &msg, NN_MSG, 0)) > 0; v1 = 0) {
  674                     header.stream_id = as->stream_id;
  675                     header.flags = 0;
  676                     header.type = (v1? F_HEADERS: F_CONT);
  677                     put_frame(&header, f_client, &err_jmp);
  678                     fwrite(msg, 1, header.length, f_client);
  679                     nn_freemsg(msg);
  680                     logmsg(4, "%lX write %d:%s -> %d %s:%d", pthread_self(), header.stream_id, v1? "HEADER": "CONT", header.length, __FILE__, __LINE__);
  681                 }
  682                 header.stream_id = as->stream_id;
  683                 header.length = 0;
  684                 header.flags = 0x04;
  685                 header.type = (v1? F_HEADERS: F_CONT);
  686                 put_frame(&header, f_client, &err_jmp);
  687                 logmsg(4, "%lX write %d -> end-headers %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  688                 while((header.length = nn_recv(as->s_be, &msg, NN_MSG, 0)) > 0) {
  689                     header.stream_id = as->stream_id;
  690                     header.flags = 0;
  691                     header.type = F_DATA;
  692                     put_frame(&header, f_client, &err_jmp);
  693                     fwrite(msg, 1, header.length, f_client);
  694                     nn_freemsg(msg);
  695                     logmsg(4, "%lX write %d:DATA -> %d %s:%d", pthread_self(), header.stream_id, header.length, __FILE__, __LINE__);
  696                 }
  697                 for(v1 = 1; (header.length = nn_recv(as->s_be, &msg, NN_MSG, 0)) > 0; v1 = 0) {
  698                     header.stream_id = as->stream_id;
  699                     header.flags = (v1? 0x01: 0);
  700                     header.type = (v1? F_HEADERS: F_CONT);
  701                     put_frame(&header, f_client, &err_jmp);
  702                     fwrite(msg, 1, header.length, f_client);
  703                     nn_freemsg(msg);
  704                     logmsg(4, "%lX write trailer %d:%s -> %d %s:%d", pthread_self(), header.stream_id, v1? "HEADER": "CONT", header.length, __FILE__, __LINE__);
  705                 }
  706                 header.stream_id = as->stream_id;
  707                 header.length = 0;
  708                 header.flags = (v1? 0x01 | 0x04: 0x04);
  709                 header.type = (v1? F_HEADERS: F_CONT);
  710                 put_frame(&header, f_client, &err_jmp);
  711                 logmsg(4, "%lX write trailer %d -> end-headers %s:%d", pthread_self(), header.stream_id, __FILE__, __LINE__);
  712 
  713                 /* end of comms; has to be done from this side, as no LINGER available */
  714                 nn_send(as->s_be, "", 0, 0);
  715                 /* sleep to make sure all messages are through before closing the channel */
  716                 t_wait.tv_sec = 0;
  717                 t_wait.tv_nsec = 1000000;
  718                 nanosleep(&t_wait, NULL);
  719 
  720                 nn_close(as->s_be);
  721 
  722                 del_active(&active_streams, &closed_streams, as->stream_id);
  723             }
  724         }
  725     }
  726 }