"Fossies" - the Fresh Open Source Software Archive

Member "snort3_extra-3.0.3-1/src/daqs/daq_socket/daq_socket.c" (23 Sep 2020, 22068 Bytes) of package /linux/misc/snort3_extra-3.0.3-1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "daq_socket.c" see the Fossies "Dox" file reference documentation.

    1 /*--------------------------------------------------------------------------
    2 // Copyright (C) 2015-2020 Cisco and/or its affiliates. All rights reserved.
    3 //
    4 // This program is free software; you can redistribute it and/or modify it
    5 // under the terms of the GNU General Public License Version 2 as published
    6 // by the Free Software Foundation.  You may not use, modify or distribute
    7 // this program under any other version of the GNU General Public License.
    8 //
    9 // This program is distributed in the hope that it will be useful, but
   10 // WITHOUT ANY WARRANTY; without even the implied warranty of
   11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   12 // General Public License for more details.
   13 //
   14 // You should have received a copy of the GNU General Public License along
   15 // with this program; if not, write to the Free Software Foundation, Inc.,
   16 // 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
   17 //--------------------------------------------------------------------------
   18 */
   19 // daq_socket.c authors Russ Combs <rucombs@cisco.com> and Carter Waxman <cwaxman@cisco.com>
   20 
   21 #include <errno.h>
   22 #include <netinet/in.h>
   23 // putting types.h here because of Bug in FreeBSD
   24 #include <sys/types.h>
   25 #include <netinet/in_systm.h>
   26 #include <netinet/ip.h>
   27 #include <stdbool.h>
   28 #include <stdlib.h>
   29 #include <string.h>
   30 #include <stdio.h>
   31 #include <sys/socket.h>
   32 #include <sys/time.h>
   33 #include <unistd.h>
   34 
   35 #include <daq_module_api.h>
   36 #include <daq_dlt.h>
   37 
   38 #include <daqs/daq_user.h>
   39 
   40 #define DAQ_MOD_VERSION 1
   41 #define DAQ_NAME "socket"
   42 #define DAQ_TYPE (DAQ_TYPE_INTF_CAPABLE | DAQ_TYPE_INLINE_CAPABLE | DAQ_TYPE_MULTI_INSTANCE)
   43 #define DEFAULT_PORT 8000
   44 #define DEFAULT_POOL_SIZE 16
   45 
   46 // FIXIT-M this should be defined by daq_module_api.h
   47 #define SET_ERROR(mod_inst, ...) daq_base_api.set_errbuf(mod_inst, __VA_ARGS__)
   48 
   49 typedef struct _SocketMsgDesc
   50 {
   51     DAQ_Msg_t msg;
   52     DAQ_PktHdr_t pkt_hdr;
   53     DAQ_UsrHdr_t pci;
   54     struct _SocketMsgDesc* next;
   55 } SocketMsgDesc;
   56 
   57 typedef struct
   58 {
   59     SocketMsgDesc* pool;
   60     SocketMsgDesc* free_list;
   61     DAQ_MsgPoolInfo_t info;
   62 } SocketMsgPool;
   63 
   64 typedef struct
   65 {
   66     DAQ_ModuleInstance_h mod_inst;
   67 
   68     struct sockaddr_in sin_a;
   69     struct sockaddr_in sin_b;
   70 
   71     DAQ_Stats_t stats;
   72 
   73     SocketMsgPool pool;
   74 
   75     int sock_a;  // recv from b
   76     int sock_b;  // recv from a
   77     int sock_c;  // connect
   78 
   79     int use_a;
   80     int port;
   81     int passive;
   82 
   83     unsigned timeout;
   84     unsigned snaplen;
   85 
   86     uint8_t ip_proto;
   87 
   88     volatile bool interrupted;
   89 } SocketContext;
   90 
   91 static DAQ_BaseAPI_t daq_base_api;
   92 
   93 static DAQ_VariableDesc_t socket_variable_descriptions[] =
   94 {
   95     { "port", "Port number to use for connecting to socket", DAQ_VAR_DESC_REQUIRES_ARGUMENT },
   96     { "proto", "Transport protocol to use for connecting to socket", DAQ_VAR_DESC_REQUIRES_ARGUMENT },
   97 };
   98 
   99 static int create_message_pool(SocketContext* sc, unsigned size)
  100 {
  101     SocketMsgPool* pool = &sc->pool;
  102     pool->pool = calloc(sizeof(SocketMsgDesc), size);
  103     if (!pool->pool)
  104     {
  105         SET_ERROR(sc->mod_inst, "%s: Could not allocate %zu bytes for a packet descriptor pool!",
  106                 __func__, sizeof(SocketMsgDesc) * size);
  107         return DAQ_ERROR_NOMEM;
  108     }
  109     pool->info.mem_size = sizeof(SocketMsgDesc) * size;
  110     pool->free_list = NULL;
  111     while (pool->info.size < size)
  112     {
  113         /* Allocate packet data and set up descriptor */
  114         SocketMsgDesc* desc = &pool->pool[pool->info.size];
  115         desc->msg.data = malloc(sc->snaplen);
  116         if (!desc->msg.data)
  117         {
  118             SET_ERROR(sc->mod_inst, "%s: Could not allocate %d bytes for a packet descriptor message buffer!",
  119                     __func__, sc->snaplen);
  120             return DAQ_ERROR_NOMEM;
  121         }
  122         pool->info.mem_size += sc->snaplen;
  123         desc->pci.ip_proto = sc->ip_proto;
  124 
  125         /* Initialize non-zero invariant packet header fields. */
  126         DAQ_PktHdr_t* pkt_hdr = &desc->pkt_hdr;
  127         pkt_hdr->address_space_id = 0;
  128         pkt_hdr->ingress_index = DAQ_PKTHDR_UNKNOWN;
  129         pkt_hdr->ingress_group = DAQ_PKTHDR_UNKNOWN;
  130         pkt_hdr->egress_index = DAQ_PKTHDR_UNKNOWN;
  131         pkt_hdr->egress_group = DAQ_PKTHDR_UNKNOWN;
  132         pkt_hdr->flags = 0;
  133         pkt_hdr->opaque = 0;
  134 
  135         /* Initialize non-zero invariant message header fields. */
  136         DAQ_Msg_t* msg = &desc->msg;
  137         msg->priv = desc;
  138         msg->type = DAQ_MSG_TYPE_PACKET;
  139         msg->hdr_len = sizeof(*pkt_hdr);
  140         msg->hdr = pkt_hdr;
  141 
  142         /* Place it on the free list */
  143         desc->next = pool->free_list;
  144         pool->free_list = desc;
  145 
  146         pool->info.size++;
  147     }
  148     pool->info.available = pool->info.size;
  149     return DAQ_SUCCESS;
  150 }
  151 
  152 //-------------------------------------------------------------------------
  153 // socket functions
  154 //-------------------------------------------------------------------------
  155 
  156 static int sock_setup(SocketContext* socket_context)
  157 {
  158     struct sockaddr_in sin;
  159 
  160     if ((socket_context->sock_c = socket(PF_INET, SOCK_STREAM, 0)) == -1)
  161     {
  162         char error_msg[1024] = {0};
  163         strerror_r(errno, error_msg, sizeof(error_msg));
  164         SET_ERROR(socket_context->mod_inst, "%s: can't create listener socket (%s)\n", __func__, error_msg);
  165         return -1;
  166     }
  167 
  168     sin.sin_family = PF_INET;
  169     sin.sin_addr.s_addr = INADDR_ANY;
  170     sin.sin_port = htons(socket_context->port);
  171 
  172     if (bind(socket_context->sock_c, (struct sockaddr*)&sin, sizeof(sin)) == -1)
  173     {
  174         char error_msg[1024] = {0};
  175         strerror_r(errno, error_msg, sizeof(error_msg));
  176         SET_ERROR(socket_context->mod_inst, "%s: can't bind listener socket (%s)\n", __func__, error_msg);
  177         return -1;
  178     }
  179 
  180     if (listen(socket_context->sock_c, 2) == -1)
  181     {
  182         char error_msg[1024] = {0};
  183         strerror_r(errno, error_msg, sizeof(error_msg));
  184         SET_ERROR(socket_context->mod_inst, "%s: can't listen on socket (%s)\n", __func__, error_msg);
  185         return -1;
  186     }
  187     return 0;
  188 }
  189 
  190 static void sock_cleanup(SocketContext* socket_context)
  191 {
  192     if (socket_context->sock_c >= 0)
  193         close(socket_context->sock_c);
  194 
  195     if (socket_context->sock_a >= 0)
  196         close(socket_context->sock_a);
  197 
  198     if (socket_context->sock_b >= 0)
  199         close(socket_context->sock_b);
  200 
  201     socket_context->sock_c = socket_context->sock_a = socket_context->sock_b = -1;
  202 }
  203 
  204 static int sock_recv(SocketContext* socket_context, SocketMsgDesc* desc, int* sock)
  205 {
  206     int n = recv(*sock, desc->msg.data, socket_context->snaplen, 0);
  207 
  208     if (n <= 0)
  209     {
  210         if (errno != EINTR)
  211         {
  212             char error_msg[1024] = {0};
  213             strerror_r(errno, error_msg, sizeof(error_msg));
  214             SET_ERROR(socket_context->mod_inst, "%s: can't recv from socket (%s)\n", __func__, error_msg);
  215             desc->pci.flags = DAQ_USR_FLAG_END_FLOW;
  216             *sock = -1;
  217         }
  218         return 0;
  219     }
  220     return n;
  221 }
  222 
  223 static int sock_send(SocketContext* socket_context, int sock, const uint8_t* buf, uint32_t len)
  224 {
  225     if (sock < 0)
  226         return 0;
  227 
  228     int n = send(sock, buf, len, 0);
  229 
  230     while (0 <= n && (uint32_t)n < len)
  231     {
  232         buf += n;
  233         len -= n;
  234         n = send(sock, buf, len, 0);
  235     }
  236     if (n == -1)
  237     {
  238         char error_msg[1024] = {0};
  239         strerror_r(errno, error_msg, sizeof(error_msg));
  240         SET_ERROR(socket_context->mod_inst, "%s: can't send on socket (%s)\n", __func__, error_msg);
  241         return -1;
  242     }
  243     return 0;
  244 }
  245 
  246 static int sock_accept(SocketContext* socket_context, SocketMsgDesc* desc, int* sock, struct sockaddr_in* psin)
  247 {
  248     const char* banner;
  249     socklen_t len = sizeof(*psin);
  250     *sock = accept(socket_context->sock_c, (struct sockaddr*)psin, &len);
  251 
  252     if (*sock == -1)
  253     {
  254         char error_msg[1024] = {0};
  255         strerror_r(errno, error_msg, sizeof(error_msg));
  256         SET_ERROR(socket_context->mod_inst, "%s: can't accept incoming connection (%s)\n", __func__, error_msg);
  257         return -1;
  258     }
  259     banner = socket_context->use_a ? "client\n" : "server\n";
  260     sock_send(socket_context, *sock, (const uint8_t*)banner, 7);
  261 
  262     desc->pci.flags = DAQ_USR_FLAG_START_FLOW;
  263     return 0;
  264 }
  265 
  266 static int sock_poll(SocketContext* socket_context, SocketMsgDesc* desc, int* sock, struct sockaddr_in* psin)
  267 {
  268     int max_fd;
  269     fd_set inputs;
  270 
  271     if (socket_context->sock_c < 0)
  272         return 0;
  273 
  274     FD_ZERO(&inputs);
  275     FD_SET(socket_context->sock_c, &inputs);
  276     max_fd = socket_context->sock_c;
  277 
  278     if (*sock > 0)
  279     {
  280         FD_SET(*sock, &inputs);
  281 
  282         if (*sock > max_fd)
  283             max_fd = *sock;
  284     }
  285 
  286     struct timeval timeout;
  287     timeout.tv_sec = 1;
  288     timeout.tv_usec = 0;
  289 
  290     if (!select(max_fd+1, &inputs, NULL, NULL, &timeout))
  291         return 0;
  292 
  293     else if (*sock >= 0 && FD_ISSET(*sock, &inputs))
  294         return sock_recv(socket_context, desc, sock);
  295 
  296     else if (*sock < 0 && FD_ISSET(socket_context->sock_c, &inputs))
  297         return sock_accept(socket_context, desc, sock, psin);
  298 
  299     return 0;
  300 }
  301 
  302 //-------------------------------------------------------------------------
  303 // daq utilities
  304 //-------------------------------------------------------------------------
  305 
  306 static int socket_daq_module_load(const DAQ_BaseAPI_t* base_api)
  307 {
  308     if (base_api->api_version != DAQ_BASE_API_VERSION || base_api->api_size != sizeof(DAQ_BaseAPI_t))
  309         return DAQ_ERROR;
  310 
  311     daq_base_api = *base_api;
  312 
  313     return DAQ_SUCCESS;
  314 }
  315 
  316 static int socket_daq_module_unload()
  317 {
  318     memset(&daq_base_api, 0, sizeof(daq_base_api));
  319     return DAQ_SUCCESS;
  320 }
  321 
  322 static int socket_daq_get_variable_descs(const DAQ_VariableDesc_t** var_desc_table)
  323 {
  324     *var_desc_table = socket_variable_descriptions;
  325 
  326     return sizeof(socket_variable_descriptions) / sizeof(DAQ_VariableDesc_t);
  327 }
  328 
  329 static void clear(SocketContext* socket_context)
  330 {
  331     if (socket_context->sock_a < 0)
  332     {
  333         socket_context->sin_a.sin_addr.s_addr = 0;
  334         socket_context->sin_a.sin_port = 0;
  335     }
  336     if (socket_context->sock_b < 0)
  337     {
  338         socket_context->sin_b.sin_addr.s_addr = 0;
  339         socket_context->sin_b.sin_port = 0;
  340     }
  341 }
  342 
  343 static void set_pkt_hdr(SocketContext* socket_context, SocketMsgDesc* desc, ssize_t len)
  344 {
  345     struct timeval t;
  346     gettimeofday(&t, NULL);
  347 
  348     DAQ_PktHdr_t* pkt_hdr = &desc->pkt_hdr;
  349     pkt_hdr->ts.tv_sec = t.tv_sec;
  350     pkt_hdr->ts.tv_usec = t.tv_usec;
  351     pkt_hdr->pktlen = len;
  352 
  353     // use_a already toggled
  354     if (socket_context->use_a)
  355     {
  356         desc->pci.src_addr = socket_context->sin_b.sin_addr.s_addr;
  357         desc->pci.dst_addr = socket_context->sin_a.sin_addr.s_addr;
  358         desc->pci.src_port = socket_context->sin_b.sin_port;
  359         desc->pci.dst_port = socket_context->sin_a.sin_port;
  360         desc->pci.flags &= ~DAQ_USR_FLAG_TO_SERVER;
  361     }
  362     else
  363     {
  364         desc->pci.src_addr = socket_context->sin_a.sin_addr.s_addr;
  365         desc->pci.dst_addr = socket_context->sin_b.sin_addr.s_addr;
  366         desc->pci.src_port = socket_context->sin_a.sin_port;
  367         desc->pci.dst_port = socket_context->sin_b.sin_port;
  368         desc->pci.flags |= DAQ_USR_FLAG_TO_SERVER;
  369     }
  370 
  371     if (desc->pci.flags & DAQ_USR_FLAG_END_FLOW)
  372         clear(socket_context);
  373 }
  374 
  375 // forward all but drops, retries and blacklists:
  376 static const int s_fwd[MAX_DAQ_VERDICT] = { 1, 0, 1, 1, 0, 1, 0 };
  377 
  378 static unsigned socket_daq_read_message(
  379     SocketContext* socket_context, SocketMsgDesc* desc, DAQ_RecvStatus* rstat)
  380 {
  381     int* sock = socket_context->use_a ? &socket_context->sock_a : &socket_context->sock_b;
  382     struct sockaddr_in* psin = socket_context->use_a ? &socket_context->sin_a : &socket_context->sin_b;
  383     desc->pci.flags = 0;
  384 
  385     unsigned size = sock_poll(socket_context, desc, sock, psin);
  386 
  387     // don't toggle w/o at least one connection so client is always 1st
  388     if (socket_context->sock_a > -1 || socket_context->sock_b > -1)
  389         socket_context->use_a = !socket_context->use_a;
  390 
  391     if (*rstat != DAQ_RSTAT_OK && !desc->pci.flags)
  392         return 0;
  393 
  394     set_pkt_hdr(socket_context, desc, size);
  395 
  396     return size;
  397 }
  398 
  399 static int socket_daq_config(SocketContext* socket_context, const DAQ_ModuleConfig_h cfg)
  400 {
  401     const char* var_key, * var_value;
  402     daq_base_api.config_first_variable(cfg, &var_key, &var_value);
  403 
  404     if (var_key)
  405     {
  406         char* end = NULL;
  407         socket_context->port = (int)strtol(var_key, &end, 0);
  408     }
  409 
  410     while (var_key)
  411     {
  412         if (!strcmp(var_key, "port"))
  413         {
  414             char* end = NULL;
  415             socket_context->port = (int)strtol(var_value, &end, 0);
  416 
  417             if (*end || socket_context->port <= 0 || socket_context->port > 65535)
  418             {
  419                 SET_ERROR(socket_context->mod_inst, "%s: bad port (%s)\n", __func__, var_value);
  420                 return DAQ_ERROR;
  421             }
  422         }
  423         else if (!strcmp(var_key, "proto"))
  424         {
  425             if (!strcmp(var_value, "tcp"))
  426                 socket_context->ip_proto = IPPROTO_TCP;
  427             else if (!strcmp(var_value, "udp"))
  428                 socket_context->ip_proto = IPPROTO_UDP;
  429             else
  430             {
  431                 SET_ERROR(socket_context->mod_inst, "%s: bad proto (%s)\n", __func__, var_value);
  432                 return DAQ_ERROR;
  433             }
  434         }
  435         else
  436         {
  437             SET_ERROR(socket_context->mod_inst, "%s: Unknown variable name: '%s'", DAQ_NAME, var_key);
  438             return DAQ_ERROR_INVAL;
  439         }
  440 
  441         daq_base_api.config_next_variable(cfg, &var_key, &var_value);
  442     }
  443 
  444     if (!socket_context->ip_proto)
  445         socket_context->ip_proto = IPPROTO_TCP;
  446 
  447     if (!socket_context->port)
  448         socket_context->port = DEFAULT_PORT;
  449 
  450     socket_context->snaplen = daq_base_api.config_get_snaplen(cfg) ?
  451         daq_base_api.config_get_snaplen(cfg) : IP_MAXPACKET;
  452 
  453     socket_context->timeout = daq_base_api.config_get_timeout(cfg);
  454     socket_context->passive = (daq_base_api.config_get_mode(cfg) == DAQ_MODE_PASSIVE);
  455 
  456     return DAQ_SUCCESS;
  457 }
  458 
  459 //-------------------------------------------------------------------------
  460 // daq
  461 //-------------------------------------------------------------------------
  462 
  463 static void socket_daq_destroy(void* handle)
  464 {
  465     SocketContext* socket_context = (SocketContext*) handle;
  466 
  467     SocketMsgPool* pool = &socket_context->pool;
  468     if (pool->pool)
  469     {
  470         while (pool->info.size > 0)
  471             free(pool->pool[--pool->info.size].msg.data);
  472         free(pool->pool);
  473         pool->pool = NULL;
  474     }
  475     pool->free_list = NULL;
  476     pool->info.available = 0;
  477     pool->info.mem_size = 0;
  478 
  479     free(socket_context);
  480 }
  481 
  482 static int socket_daq_instantiate(const DAQ_ModuleConfig_h cfg, DAQ_ModuleInstance_h mod_inst, void** handle)
  483 {
  484     SocketContext* socket_context = calloc(1, sizeof(*socket_context));
  485 
  486     if (!socket_context)
  487     {
  488         SET_ERROR(mod_inst, "%s: failed to allocate the socket context!", __func__);
  489         return DAQ_ERROR_NOMEM;
  490     }
  491 
  492     if (socket_daq_config(socket_context, cfg) != DAQ_SUCCESS)
  493     {
  494         socket_daq_destroy(socket_context);
  495         return DAQ_ERROR;
  496     }
  497 
  498     uint32_t pool_size = daq_base_api.config_get_msg_pool_size(cfg);
  499     if (pool_size == 0)
  500         pool_size = DEFAULT_POOL_SIZE;
  501 
  502     int rval = create_message_pool(socket_context, pool_size);
  503     if (rval != DAQ_SUCCESS)
  504     {
  505         socket_daq_destroy(socket_context);
  506         return rval;
  507     }
  508 
  509     socket_context->mod_inst = mod_inst;
  510     socket_context->sock_c = socket_context->sock_a = socket_context->sock_b = -1;
  511     socket_context->use_a = 1;
  512 
  513     *handle = socket_context;
  514     return DAQ_SUCCESS;
  515 }
  516 
  517 static int socket_daq_start(void* handle)
  518 {
  519     SocketContext* socket_context = (SocketContext*) handle;
  520 
  521     if (sock_setup(socket_context))
  522         return DAQ_ERROR;
  523 
  524     return DAQ_SUCCESS;
  525 }
  526 
  527 static int socket_daq_stop(void* handle)
  528 {
  529     SocketContext* socket_context = (SocketContext*) handle;
  530     sock_cleanup(socket_context);
  531     return DAQ_SUCCESS;
  532 }
  533 
  534 static int socket_ioctl(void* handle, DAQ_IoctlCmd cmd, void* arg, size_t arglen)
  535 {
  536     (void) handle;
  537 
  538     if (cmd == DIOCTL_QUERY_USR_PCI)
  539     {
  540         if (arglen != sizeof(DIOCTL_QueryUsrPCI))
  541             return DAQ_ERROR_INVAL;
  542 
  543         DIOCTL_QueryUsrPCI* qup = (DIOCTL_QueryUsrPCI*)arg;
  544 
  545         if (!qup->msg)
  546             return DAQ_ERROR_INVAL;
  547 
  548         SocketMsgDesc* desc = (SocketMsgDesc*) qup->msg->priv;
  549         qup->pci = &desc->pci;
  550 
  551         return DAQ_SUCCESS;
  552     }
  553     return DAQ_ERROR_NOTSUP;
  554 }
  555 
  556 static int socket_daq_inject(void* handle, DAQ_MsgType type, const void* hdr, const uint8_t* buf, uint32_t len)
  557 {
  558     (void) hdr;
  559 
  560     if (type != DAQ_MSG_TYPE_PAYLOAD)
  561         return DAQ_ERROR_NOTSUP;
  562 
  563     SocketContext* socket_context = (SocketContext*) handle;
  564     int egress = socket_context->use_a ? socket_context->sock_a : socket_context->sock_b;
  565     int status = sock_send(socket_context, egress, buf, len);
  566 
  567     if (status)
  568         return DAQ_ERROR;
  569 
  570     socket_context->stats.packets_injected++;
  571     return DAQ_SUCCESS;
  572 }
  573 
  574 static int socket_daq_inject_relative(void* handle, const DAQ_Msg_t* msg, const uint8_t* buf, uint32_t len, int reverse)
  575 {
  576     SocketContext* socket_context = (SocketContext*) handle;
  577     int egress;
  578 
  579     (void) msg;
  580 
  581     if (reverse)
  582         egress = socket_context->use_a ? socket_context->sock_b : socket_context->sock_a;
  583     else
  584         egress = socket_context->use_a ? socket_context->sock_a : socket_context->sock_b;
  585 
  586     int status = sock_send(socket_context, egress, buf, len);
  587 
  588     if (status)
  589         return DAQ_ERROR;
  590 
  591     socket_context->stats.packets_injected++;
  592     return DAQ_SUCCESS;
  593 }
  594 
  595 static unsigned socket_daq_msg_receive(void* handle, const unsigned max_recv, const DAQ_Msg_t* msgs[], DAQ_RecvStatus* rstat)
  596 {
  597     SocketContext* socket_context = (SocketContext*) handle;
  598     unsigned idx = 0, miss = 0;
  599 
  600     *rstat = DAQ_RSTAT_OK;
  601 
  602     while (idx < max_recv && ++miss < 2)
  603     {
  604         if (socket_context->interrupted)
  605         {
  606             socket_context->interrupted = false;
  607             *rstat = DAQ_RSTAT_INTERRUPTED;
  608             break;
  609         }
  610 
  611         SocketMsgDesc* desc = socket_context->pool.free_list;
  612         if (!desc)
  613         {
  614             *rstat = DAQ_RSTAT_NOBUF;
  615             break;
  616         }
  617 
  618         unsigned size = socket_daq_read_message(socket_context, desc, rstat);
  619         if (*rstat != DAQ_RSTAT_OK)
  620             break;
  621 
  622         if (size)
  623         {
  624             desc->msg.data_len = size;
  625             socket_context->pool.free_list = desc->next;
  626             desc->next = NULL;
  627             socket_context->pool.info.available--;
  628             msgs[idx] = &desc->msg;
  629             idx++;
  630 
  631             miss = 0;
  632         }
  633     }
  634 
  635     return idx;
  636 }
  637 
  638 static int socket_daq_msg_finalize(void* handle, const DAQ_Msg_t* msg, DAQ_Verdict verdict)
  639 {
  640     SocketContext* socket_context = (SocketContext*) handle;
  641     SocketMsgDesc* desc = (SocketMsgDesc*) msg->priv;
  642 
  643     if (verdict >= MAX_DAQ_VERDICT)
  644         verdict = DAQ_VERDICT_BLOCK;
  645 
  646     socket_context->stats.verdicts[verdict]++;
  647 
  648     if (socket_context->passive || s_fwd[verdict])
  649     {
  650         // already toggled use_a, so we get a->b or b->a
  651         int egress = socket_context->use_a ? socket_context->sock_a : socket_context->sock_b;
  652 
  653         if (sock_send(socket_context, egress, desc->msg.data, msg->data_len))
  654             return DAQ_ERROR;
  655     }
  656 
  657     desc->next = socket_context->pool.free_list;
  658     socket_context->pool.free_list = desc;
  659     socket_context->pool.info.available++;
  660     return DAQ_SUCCESS;
  661 }
  662 
  663 static int socket_daq_interrupt(void* handle)
  664 {
  665     SocketContext* socket_context = (SocketContext*) handle;
  666     socket_context->interrupted = true;
  667     return DAQ_SUCCESS;
  668 }
  669 
  670 static int socket_daq_get_stats(void* handle, DAQ_Stats_t* stats)
  671 {
  672     SocketContext* socket_context = (SocketContext*) handle;
  673     *stats = socket_context->stats;
  674     return DAQ_SUCCESS;
  675 }
  676 
  677 static void socket_daq_reset_stats(void* handle)
  678 {
  679     SocketContext* socket_context = (SocketContext*) handle;
  680     memset(&socket_context->stats, 0, sizeof(socket_context->stats));
  681 }
  682 
  683 static int socket_daq_get_snaplen(void* handle)
  684 {
  685     SocketContext* socket_context = (SocketContext*) handle;
  686     return socket_context->snaplen;
  687 }
  688 
  689 static uint32_t socket_daq_get_capabilities(void* handle)
  690 {
  691     (void) handle;
  692     return DAQ_CAPA_BLOCK | DAQ_CAPA_REPLACE | DAQ_CAPA_INJECT | DAQ_CAPA_INJECT_RAW
  693         | DAQ_CAPA_INTERRUPT | DAQ_CAPA_UNPRIV_START;
  694 }
  695 
  696 static int socket_daq_get_datalink_type(void* handle)
  697 {
  698     (void) handle;
  699     return DLT_USER;
  700 }
  701 
  702 static int socket_daq_set_filter(void* handle, const char* filter)
  703 {
  704     (void) handle;
  705     (void) filter;
  706     return DAQ_ERROR_NOTSUP;
  707 }
  708 
  709 static int socket_daq_get_msg_pool_info(void* handle, DAQ_MsgPoolInfo_t* info)
  710 {
  711     SocketContext* socket_context = (SocketContext*) handle;
  712     *info = socket_context->pool.info;
  713     return DAQ_SUCCESS;
  714 }
  715 
  716 //-------------------------------------------------------------------------
  717 
  718 DAQ_SO_PUBLIC DAQ_ModuleAPI_t DAQ_MODULE_DATA =
  719 {
  720     .api_version = DAQ_MODULE_API_VERSION,
  721     .api_size = sizeof(DAQ_ModuleAPI_t),
  722     .module_version = DAQ_MOD_VERSION,
  723     .name = DAQ_NAME,
  724     .type = DAQ_TYPE,
  725     .load = socket_daq_module_load,
  726     .unload = socket_daq_module_unload,
  727     .get_variable_descs = socket_daq_get_variable_descs,
  728     .instantiate = socket_daq_instantiate,
  729     .destroy = socket_daq_destroy,
  730     .set_filter = socket_daq_set_filter,
  731     .start = socket_daq_start,
  732     .inject = socket_daq_inject,
  733     .inject_relative = socket_daq_inject_relative,
  734     .interrupt = socket_daq_interrupt,
  735     .stop = socket_daq_stop,
  736     .ioctl = socket_ioctl,
  737     .get_stats = socket_daq_get_stats,
  738     .reset_stats = socket_daq_reset_stats,
  739     .get_snaplen = socket_daq_get_snaplen,
  740     .get_capabilities = socket_daq_get_capabilities,
  741     .get_datalink_type = socket_daq_get_datalink_type,
  742     .config_load = NULL,
  743     .config_swap = NULL,
  744     .config_free = NULL, 
  745     .msg_receive = socket_daq_msg_receive,
  746     .msg_finalize = socket_daq_msg_finalize,
  747     .get_msg_pool_info = socket_daq_get_msg_pool_info,
  748 };