"Fossies" - the Fresh Open Source Software Archive

Member "bind-9.17.5/lib/isc/netmgr/tcp.c" (4 Sep 2020, 30847 Bytes) of package /linux/misc/dns/bind9/9.17.5/bind-9.17.5.tar.xz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "tcp.c" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 9.17.3_vs_9.17.4.

    1 /*
    2  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
    3  *
    4  * This Source Code Form is subject to the terms of the Mozilla Public
    5  * License, v. 2.0. If a copy of the MPL was not distributed with this
    6  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
    7  *
    8  * See the COPYRIGHT file distributed with this work for additional
    9  * information regarding copyright ownership.
   10  */
   11 
   12 #include <libgen.h>
   13 #include <unistd.h>
   14 #include <uv.h>
   15 
   16 #include <isc/atomic.h>
   17 #include <isc/buffer.h>
   18 #include <isc/condition.h>
   19 #include <isc/log.h>
   20 #include <isc/magic.h>
   21 #include <isc/mem.h>
   22 #include <isc/netmgr.h>
   23 #include <isc/quota.h>
   24 #include <isc/random.h>
   25 #include <isc/refcount.h>
   26 #include <isc/region.h>
   27 #include <isc/result.h>
   28 #include <isc/sockaddr.h>
   29 #include <isc/stdtime.h>
   30 #include <isc/thread.h>
   31 #include <isc/util.h>
   32 
   33 #include "netmgr-int.h"
   34 #include "uv-compat.h"
   35 
   36 static atomic_uint_fast32_t last_tcpquota_log = ATOMIC_VAR_INIT(0);
   37 
   38 static bool
   39 can_log_tcp_quota(void) {
   40     isc_stdtime_t now, last;
   41 
   42     isc_stdtime_get(&now);
   43     last = atomic_exchange_relaxed(&last_tcpquota_log, now);
   44     if (now != last) {
   45         return (true);
   46     }
   47 
   48     return (false);
   49 }
   50 
   51 static int
   52 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
   53 
   54 static void
   55 tcp_close_direct(isc_nmsocket_t *sock);
   56 
   57 static isc_result_t
   58 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
   59 static void
   60 tcp_connect_cb(uv_connect_t *uvreq, int status);
   61 
   62 static void
   63 tcp_connection_cb(uv_stream_t *server, int status);
   64 
   65 static void
   66 read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf);
   67 
   68 static void
   69 tcp_close_cb(uv_handle_t *uvhandle);
   70 
   71 static void
   72 tcp_listenclose_cb(uv_handle_t *handle);
   73 static isc_result_t
   74 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota);
   75 
   76 static void
   77 quota_accept_cb(isc_quota_t *quota, void *sock0);
   78 
   79 static int
   80 tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
   81     isc__networker_t *worker = NULL;
   82     int r;
   83 
   84     REQUIRE(isc__nm_in_netthread());
   85 
   86     worker = &sock->mgr->workers[isc_nm_tid()];
   87 
   88     r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
   89     if (r != 0) {
   90         isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
   91         /* Socket was never opened; no need for tcp_close_direct() */
   92         atomic_store(&sock->closed, true);
   93         sock->result = isc__nm_uverr2result(r);
   94         atomic_store(&sock->connect_error, true);
   95         return (r);
   96     }
   97 
   98     if (req->local.length != 0) {
   99         r = uv_tcp_bind(&sock->uv_handle.tcp, &req->local.type.sa, 0);
  100         if (r != 0) {
  101             isc__nm_incstats(sock->mgr,
  102                      sock->statsindex[STATID_BINDFAIL]);
  103             sock->result = isc__nm_uverr2result(r);
  104             atomic_store(&sock->connect_error, true);
  105             tcp_close_direct(sock);
  106             return (r);
  107         }
  108     }
  109 
  110     uv_handle_set_data(&sock->uv_handle.handle, sock);
  111     r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
  112                &req->peer.type.sa, tcp_connect_cb);
  113     if (r != 0) {
  114         isc__nm_incstats(sock->mgr,
  115                  sock->statsindex[STATID_CONNECTFAIL]);
  116         sock->result = isc__nm_uverr2result(r);
  117         atomic_store(&sock->connect_error, true);
  118         tcp_close_direct(sock);
  119     }
  120     return (r);
  121 }
  122 
  123 void
  124 isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
  125     isc__netievent_tcpconnect_t *ievent =
  126         (isc__netievent_tcpconnect_t *)ev0;
  127     isc_nmsocket_t *sock = ievent->sock;
  128     isc__nm_uvreq_t *req = ievent->req;
  129     int r;
  130 
  131     UNUSED(worker);
  132 
  133     r = tcp_connect_direct(sock, req);
  134     if (r != 0) {
  135         /* We need to issue callbacks ourselves */
  136         tcp_connect_cb(&req->uv_req.connect, r);
  137         goto done;
  138     }
  139 
  140     atomic_store(&sock->connected, true);
  141 
  142 done:
  143     LOCK(&sock->lock);
  144     SIGNAL(&sock->cond);
  145     UNLOCK(&sock->lock);
  146 }
  147 
  148 static void
  149 tcp_connect_cb(uv_connect_t *uvreq, int status) {
  150     isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)uvreq->data;
  151     isc_nmsocket_t *sock = NULL;
  152 
  153     sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
  154 
  155     REQUIRE(VALID_UVREQ(req));
  156 
  157     if (status == 0) {
  158         isc_result_t result;
  159         struct sockaddr_storage ss;
  160         isc_nmhandle_t *handle = NULL;
  161 
  162         sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
  163         isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
  164         uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
  165                    &(int){ sizeof(ss) });
  166         result = isc_sockaddr_fromsockaddr(&sock->peer,
  167                            (struct sockaddr *)&ss);
  168         RUNTIME_CHECK(result == ISC_R_SUCCESS);
  169 
  170         handle = isc__nmhandle_get(sock, NULL, NULL);
  171         req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
  172 
  173         isc__nm_uvreq_put(&req, sock);
  174 
  175         /*
  176          * The sock is now attached to the handle.
  177          */
  178         isc__nmsocket_detach(&sock);
  179 
  180         /*
  181          * If the connect callback wants to hold on to the handle,
  182          * it needs to attach to it.
  183          */
  184         isc_nmhandle_unref(handle);
  185     } else {
  186         /*
  187          * TODO:
  188          * Handle the connect error properly and free the socket.
  189          */
  190         req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
  191         isc__nm_uvreq_put(&req, sock);
  192     }
  193 }
  194 
  195 isc_result_t
  196 isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
  197           isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize) {
  198     isc_nmsocket_t *nsock = NULL, *tmp = NULL;
  199     isc__netievent_tcpconnect_t *ievent = NULL;
  200     isc__nm_uvreq_t *req = NULL;
  201     isc_result_t result = ISC_R_SUCCESS;
  202 
  203     REQUIRE(VALID_NM(mgr));
  204 
  205     nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
  206     isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local);
  207     nsock->extrahandlesize = extrahandlesize;
  208     nsock->result = ISC_R_SUCCESS;
  209 
  210     req = isc__nm_uvreq_get(mgr, nsock);
  211     req->cb.connect = cb;
  212     req->cbarg = cbarg;
  213     req->peer = peer->addr;
  214 
  215     ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
  216     ievent->sock = nsock;
  217     ievent->req = req;
  218 
  219     /*
  220      * Async callbacks can dereference the socket in the meantime,
  221      * we need to hold an additional reference to it.
  222      */
  223     isc__nmsocket_attach(nsock, &tmp);
  224 
  225     if (isc__nm_in_netthread()) {
  226         nsock->tid = isc_nm_tid();
  227         isc__nm_async_tcpconnect(&mgr->workers[nsock->tid],
  228                      (isc__netievent_t *)ievent);
  229         isc__nm_put_ievent(mgr, ievent);
  230     } else {
  231         nsock->tid = isc_random_uniform(mgr->nworkers);
  232         isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
  233                        (isc__netievent_t *)ievent);
  234 
  235         LOCK(&nsock->lock);
  236         while (!atomic_load(&nsock->connected) &&
  237                !atomic_load(&nsock->connect_error)) {
  238             WAIT(&nsock->cond, &nsock->lock);
  239         }
  240         UNLOCK(&nsock->lock);
  241     }
  242 
  243     if (nsock->result != ISC_R_SUCCESS) {
  244         result = nsock->result;
  245         isc__nmsocket_detach(&nsock);
  246     }
  247 
  248     isc__nmsocket_detach(&tmp);
  249 
  250     return (result);
  251 }
  252 
  253 isc_result_t
  254 isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
  255          isc_nm_accept_cb_t accept_cb, void *accept_cbarg,
  256          size_t extrahandlesize, int backlog, isc_quota_t *quota,
  257          isc_nmsocket_t **sockp) {
  258     isc_nmsocket_t *nsock = NULL;
  259     isc__netievent_tcplisten_t *ievent = NULL;
  260 
  261     REQUIRE(VALID_NM(mgr));
  262 
  263     nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
  264     isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface);
  265     nsock->accept_cb.accept = accept_cb;
  266     nsock->accept_cbarg = accept_cbarg;
  267     nsock->extrahandlesize = extrahandlesize;
  268     nsock->backlog = backlog;
  269     nsock->result = ISC_R_SUCCESS;
  270     if (quota != NULL) {
  271         /*
  272          * We don't attach to quota, just assign - to avoid
  273          * increasing quota unnecessarily.
  274          */
  275         nsock->pquota = quota;
  276     }
  277     isc_quota_cb_init(&nsock->quotacb, quota_accept_cb, nsock);
  278 
  279     ievent = isc__nm_get_ievent(mgr, netievent_tcplisten);
  280     ievent->sock = nsock;
  281     if (isc__nm_in_netthread()) {
  282         nsock->tid = isc_nm_tid();
  283         isc__nm_async_tcplisten(&mgr->workers[nsock->tid],
  284                     (isc__netievent_t *)ievent);
  285         isc__nm_put_ievent(mgr, ievent);
  286     } else {
  287         nsock->tid = isc_random_uniform(mgr->nworkers);
  288         isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
  289                        (isc__netievent_t *)ievent);
  290 
  291         LOCK(&nsock->lock);
  292         while (!atomic_load(&nsock->listening) &&
  293                !atomic_load(&nsock->listen_error)) {
  294             WAIT(&nsock->cond, &nsock->lock);
  295         }
  296         UNLOCK(&nsock->lock);
  297     }
  298 
  299     if (nsock->result == ISC_R_SUCCESS) {
  300         *sockp = nsock;
  301         return (ISC_R_SUCCESS);
  302     } else {
  303         isc_result_t result = nsock->result;
  304         isc__nmsocket_detach(&nsock);
  305         return (result);
  306     }
  307 }
  308 
  309 /*
  310  * For multi-threaded TCP listening, we create a single socket,
  311  * bind to it, and start listening. On an incoming connection we accept
  312  * it, and then pass the accepted socket using the uv_export/uv_import
  313  * mechanism to a child thread.
  314  */
  315 void
  316 isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
  317     isc__netievent_tcplisten_t *ievent = (isc__netievent_tcplisten_t *)ev0;
  318     isc_nmsocket_t *sock = ievent->sock;
  319     struct sockaddr_storage sname;
  320     int r, flags = 0, snamelen = sizeof(sname);
  321 
  322     REQUIRE(isc__nm_in_netthread());
  323     REQUIRE(sock->type == isc_nm_tcplistener);
  324 
  325     r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
  326     if (r != 0) {
  327         isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
  328         /* The socket was never opened, so no need for uv_close() */
  329         atomic_store(&sock->closed, true);
  330         sock->result = isc__nm_uverr2result(r);
  331         atomic_store(&sock->listen_error, true);
  332         goto done;
  333     }
  334 
  335     isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
  336 
  337     if (sock->iface->addr.type.sa.sa_family == AF_INET6) {
  338         flags = UV_TCP_IPV6ONLY;
  339     }
  340 
  341     r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa,
  342             flags);
  343     if (r == UV_EADDRNOTAVAIL &&
  344         isc__nm_socket_freebind(&sock->uv_handle.handle) == ISC_R_SUCCESS)
  345     {
  346         /*
  347          * Retry binding with IP_FREEBIND (or equivalent option) if the
  348          * address is not available. This helps with IPv6 tentative
  349          * addresses which are reported by the route socket, although
  350          * named is not yet able to properly bind to them.
  351          */
  352         r = uv_tcp_bind(&sock->uv_handle.tcp,
  353                 &sock->iface->addr.type.sa, flags);
  354     }
  355 
  356     if (r != 0) {
  357         isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
  358         uv_close(&sock->uv_handle.handle, tcp_close_cb);
  359         sock->result = isc__nm_uverr2result(r);
  360         atomic_store(&sock->listen_error, true);
  361         goto done;
  362     }
  363 
  364     /*
  365      * By doing this now, we can find out immediately whether bind()
  366      * failed, and quit if so. (uv_bind() uses a delayed error,
  367      * initially returning success even if bind() fails, and this
  368      * could cause a deadlock later if we didn't check first.)
  369      */
  370     r = uv_tcp_getsockname(&sock->uv_handle.tcp, (struct sockaddr *)&sname,
  371                    &snamelen);
  372     if (r != 0) {
  373         uv_close(&sock->uv_handle.handle, tcp_close_cb);
  374         sock->result = isc__nm_uverr2result(r);
  375         atomic_store(&sock->listen_error, true);
  376         goto done;
  377     }
  378 
  379     /*
  380      * The callback will run in the same thread uv_listen() was called
  381      * from, so a race with tcp_connection_cb() isn't possible.
  382      */
  383     r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog,
  384               tcp_connection_cb);
  385     if (r != 0) {
  386         isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
  387                   ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
  388                   "uv_listen failed: %s",
  389                   isc_result_totext(isc__nm_uverr2result(r)));
  390         uv_close(&sock->uv_handle.handle, tcp_close_cb);
  391         sock->result = isc__nm_uverr2result(r);
  392         atomic_store(&sock->listen_error, true);
  393         goto done;
  394     }
  395 
  396     uv_handle_set_data(&sock->uv_handle.handle, sock);
  397 
  398     atomic_store(&sock->listening, true);
  399 
  400 done:
  401     LOCK(&sock->lock);
  402     SIGNAL(&sock->cond);
  403     UNLOCK(&sock->lock);
  404     return;
  405 }
  406 
  407 static void
  408 tcp_connection_cb(uv_stream_t *server, int status) {
  409     isc_nmsocket_t *psock = uv_handle_get_data((uv_handle_t *)server);
  410     isc_result_t result;
  411 
  412     UNUSED(status);
  413 
  414     result = accept_connection(psock, NULL);
  415     if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
  416         if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
  417             can_log_tcp_quota()) {
  418             isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
  419                       ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
  420                       "TCP connection failed: %s",
  421                       isc_result_totext(result));
  422         }
  423     }
  424 }
  425 
  426 void
  427 isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
  428     isc__netievent_tcpchildaccept_t *ievent =
  429         (isc__netievent_tcpchildaccept_t *)ev0;
  430     isc_nmsocket_t *ssock = ievent->sock;
  431     isc_nmsocket_t *csock = NULL;
  432     isc_nmhandle_t *handle;
  433     isc_result_t result;
  434     struct sockaddr_storage ss;
  435     isc_sockaddr_t local;
  436     int r;
  437 
  438     REQUIRE(isc__nm_in_netthread());
  439     REQUIRE(ssock->type == isc_nm_tcplistener);
  440 
  441     csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t));
  442     isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface);
  443     csock->tid = isc_nm_tid();
  444     csock->extrahandlesize = ssock->extrahandlesize;
  445 
  446     csock->quota = ievent->quota;
  447     ievent->quota = NULL;
  448 
  449     worker = &ssock->mgr->workers[isc_nm_tid()];
  450     uv_tcp_init(&worker->loop, &csock->uv_handle.tcp);
  451 
  452     r = isc_uv_import(&csock->uv_handle.stream, &ievent->streaminfo);
  453     if (r != 0) {
  454         isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
  455                   ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
  456                   "uv_import failed: %s",
  457                   isc_result_totext(isc__nm_uverr2result(r)));
  458         result = isc__nm_uverr2result(r);
  459         goto error;
  460     }
  461 
  462     r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
  463                    &(int){ sizeof(ss) });
  464     if (r != 0) {
  465         result = isc__nm_uverr2result(r);
  466         goto error;
  467     }
  468 
  469     result = isc_sockaddr_fromsockaddr(&csock->peer,
  470                        (struct sockaddr *)&ss);
  471     if (result != ISC_R_SUCCESS) {
  472         goto error;
  473     }
  474 
  475     r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss,
  476                    &(int){ sizeof(ss) });
  477     if (r != 0) {
  478         result = isc__nm_uverr2result(r);
  479         goto error;
  480     }
  481 
  482     result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss);
  483     if (result != ISC_R_SUCCESS) {
  484         goto error;
  485     }
  486 
  487     isc__nmsocket_attach(ssock, &csock->server);
  488 
  489     handle = isc__nmhandle_get(csock, NULL, &local);
  490 
  491     INSIST(ssock->accept_cb.accept != NULL);
  492     csock->read_timeout = ssock->mgr->init;
  493     ssock->accept_cb.accept(handle, ISC_R_SUCCESS, ssock->accept_cbarg);
  494 
  495     /*
  496      * csock is now attached to the handle.
  497      */
  498     isc__nmsocket_detach(&csock);
  499 
  500     /*
  501      * If the accept callback wants to hold on to the handle,
  502      * it needs to attach to it.
  503      */
  504     isc_nmhandle_unref(handle);
  505     return;
  506 
  507 error:
  508     /*
  509      * Detach the quota early to make room for other connections;
  510      * otherwise it'd be detached later asynchronously, and clog
  511      * the quota unnecessarily.
  512      */
  513     if (csock->quota != NULL) {
  514         isc_quota_detach(&csock->quota);
  515     }
  516     isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
  517               ISC_LOG_ERROR, "Accepting TCP connection failed: %s",
  518               isc_result_totext(result));
  519 
  520     /*
  521      * Detach the socket properly to make sure uv_close() is called.
  522      */
  523     isc__nmsocket_detach(&csock);
  524 }
  525 
  526 void
  527 isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
  528     isc__netievent_tcpstop_t *ievent = NULL;
  529 
  530     REQUIRE(VALID_NMSOCK(sock));
  531     REQUIRE(!isc__nm_in_netthread());
  532 
  533     ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop);
  534     isc__nmsocket_attach(sock, &ievent->sock);
  535     isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
  536                    (isc__netievent_t *)ievent);
  537 }
  538 
  539 void
  540 isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
  541     isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *)ev0;
  542     isc_nmsocket_t *sock = ievent->sock;
  543 
  544     UNUSED(worker);
  545 
  546     REQUIRE(isc__nm_in_netthread());
  547     REQUIRE(VALID_NMSOCK(sock));
  548     REQUIRE(sock->type == isc_nm_tcplistener);
  549 
  550     /*
  551      * If network manager is interlocked, re-enqueue the event for later.
  552      */
  553     if (!isc__nm_acquire_interlocked(sock->mgr)) {
  554         isc__netievent_tcpstop_t *event = NULL;
  555 
  556         event = isc__nm_get_ievent(sock->mgr, netievent_tcpstop);
  557         event->sock = sock;
  558         isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
  559                        (isc__netievent_t *)event);
  560     } else {
  561         uv_close((uv_handle_t *)&sock->uv_handle.tcp,
  562              tcp_listenclose_cb);
  563         isc__nm_drop_interlocked(sock->mgr);
  564     }
  565 }
  566 
  567 /*
  568  * This callback is used for closing listening sockets.
  569  */
  570 static void
  571 tcp_listenclose_cb(uv_handle_t *handle) {
  572     isc_nmsocket_t *sock = uv_handle_get_data(handle);
  573 
  574     LOCK(&sock->lock);
  575     atomic_store(&sock->closed, true);
  576     atomic_store(&sock->listening, false);
  577     sock->pquota = NULL;
  578     UNLOCK(&sock->lock);
  579 
  580     isc__nmsocket_detach(&sock);
  581 }
  582 
  583 static void
  584 readtimeout_cb(uv_timer_t *handle) {
  585     isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
  586 
  587     REQUIRE(VALID_NMSOCK(sock));
  588     REQUIRE(sock->tid == isc_nm_tid());
  589 
  590     /*
  591      * Socket is actively processing something, so restart the timer
  592      * and return.
  593      */
  594     if (atomic_load(&sock->processing)) {
  595         uv_timer_start(handle, readtimeout_cb, sock->read_timeout, 0);
  596         return;
  597     }
  598 
  599     /*
  600      * Timeout; stop reading and process whatever we have.
  601      */
  602     uv_read_stop(&sock->uv_handle.stream);
  603     if (sock->quota) {
  604         isc_quota_detach(&sock->quota);
  605     }
  606     if (sock->rcb.recv != NULL) {
  607         sock->rcb.recv(sock->tcphandle, ISC_R_TIMEDOUT, NULL,
  608                    sock->rcbarg);
  609         isc__nmsocket_clearcb(sock);
  610     }
  611 }
  612 
  613 isc_result_t
  614 isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
  615     isc_nmsocket_t *sock = NULL;
  616     isc__netievent_startread_t *ievent = NULL;
  617 
  618     REQUIRE(VALID_NMHANDLE(handle));
  619     REQUIRE(VALID_NMSOCK(handle->sock));
  620 
  621     sock = handle->sock;
  622     sock->rcb.recv = cb;
  623     sock->rcbarg = cbarg;
  624 
  625     ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
  626     ievent->sock = sock;
  627 
  628     if (sock->tid == isc_nm_tid()) {
  629         isc__nm_async_tcp_startread(&sock->mgr->workers[sock->tid],
  630                         (isc__netievent_t *)ievent);
  631         isc__nm_put_ievent(sock->mgr, ievent);
  632     } else {
  633         isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
  634                        (isc__netievent_t *)ievent);
  635     }
  636 
  637     return (ISC_R_SUCCESS);
  638 }
  639 
  640 /*%<
  641  * Allocator for TCP read operations. Limited to size 2^16.
  642  *
  643  * Note this doesn't actually allocate anything, it just assigns the
  644  * worker's receive buffer to a socket, and marks it as "in use".
  645  */
  646 static void
  647 tcp_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
  648     isc_nmsocket_t *sock = uv_handle_get_data(handle);
  649     isc__networker_t *worker = NULL;
  650 
  651     REQUIRE(VALID_NMSOCK(sock));
  652     REQUIRE(sock->type == isc_nm_tcpsocket);
  653     REQUIRE(isc__nm_in_netthread());
  654     REQUIRE(size <= 65536);
  655 
  656     worker = &sock->mgr->workers[sock->tid];
  657     INSIST(!worker->recvbuf_inuse);
  658 
  659     buf->base = worker->recvbuf;
  660     buf->len = size;
  661     worker->recvbuf_inuse = true;
  662 }
  663 
  664 void
  665 isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) {
  666     isc__netievent_startread_t *ievent = (isc__netievent_startread_t *)ev0;
  667     isc_nmsocket_t *sock = ievent->sock;
  668     int r;
  669 
  670     REQUIRE(worker->id == isc_nm_tid());
  671     if (sock->read_timeout != 0) {
  672         if (!sock->timer_initialized) {
  673             uv_timer_init(&worker->loop, &sock->timer);
  674             uv_handle_set_data((uv_handle_t *)&sock->timer, sock);
  675             sock->timer_initialized = true;
  676         }
  677         uv_timer_start(&sock->timer, readtimeout_cb, sock->read_timeout,
  678                    0);
  679     }
  680 
  681     r = uv_read_start(&sock->uv_handle.stream, tcp_alloc_cb, read_cb);
  682     if (r != 0) {
  683         isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
  684     }
  685 }
  686 
  687 isc_result_t
  688 isc__nm_tcp_pauseread(isc_nmsocket_t *sock) {
  689     isc__netievent_pauseread_t *ievent = NULL;
  690 
  691     REQUIRE(VALID_NMSOCK(sock));
  692 
  693     if (atomic_load(&sock->readpaused)) {
  694         return (ISC_R_SUCCESS);
  695     }
  696 
  697     atomic_store(&sock->readpaused, true);
  698     ievent = isc__nm_get_ievent(sock->mgr, netievent_tcppauseread);
  699     ievent->sock = sock;
  700 
  701     if (sock->tid == isc_nm_tid()) {
  702         isc__nm_async_tcp_pauseread(&sock->mgr->workers[sock->tid],
  703                         (isc__netievent_t *)ievent);
  704         isc__nm_put_ievent(sock->mgr, ievent);
  705     } else {
  706         isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
  707                        (isc__netievent_t *)ievent);
  708     }
  709 
  710     return (ISC_R_SUCCESS);
  711 }
  712 
  713 void
  714 isc__nm_async_tcp_pauseread(isc__networker_t *worker, isc__netievent_t *ev0) {
  715     isc__netievent_pauseread_t *ievent = (isc__netievent_pauseread_t *)ev0;
  716     isc_nmsocket_t *sock = ievent->sock;
  717 
  718     REQUIRE(VALID_NMSOCK(sock));
  719     REQUIRE(worker->id == isc_nm_tid());
  720 
  721     if (sock->timer_initialized) {
  722         uv_timer_stop(&sock->timer);
  723     }
  724     uv_read_stop(&sock->uv_handle.stream);
  725 }
  726 
  727 isc_result_t
  728 isc__nm_tcp_resumeread(isc_nmsocket_t *sock) {
  729     isc__netievent_startread_t *ievent = NULL;
  730 
  731     REQUIRE(VALID_NMSOCK(sock));
  732     if (sock->rcb.recv == NULL) {
  733         return (ISC_R_CANCELED);
  734     }
  735 
  736     if (!atomic_load(&sock->readpaused)) {
  737         return (ISC_R_SUCCESS);
  738     }
  739 
  740     atomic_store(&sock->readpaused, false);
  741 
  742     ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread);
  743     ievent->sock = sock;
  744 
  745     if (sock->tid == isc_nm_tid()) {
  746         isc__nm_async_tcp_startread(&sock->mgr->workers[sock->tid],
  747                         (isc__netievent_t *)ievent);
  748         isc__nm_put_ievent(sock->mgr, ievent);
  749     } else {
  750         isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
  751                        (isc__netievent_t *)ievent);
  752     }
  753 
  754     return (ISC_R_SUCCESS);
  755 }
  756 
  757 static void
  758 read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
  759     isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream);
  760 
  761     REQUIRE(VALID_NMSOCK(sock));
  762     REQUIRE(buf != NULL);
  763 
  764     if (nread >= 0) {
  765         isc_region_t region = { .base = (unsigned char *)buf->base,
  766                     .length = nread };
  767 
  768         if (sock->rcb.recv != NULL) {
  769             sock->rcb.recv(sock->tcphandle, ISC_R_SUCCESS, &region,
  770                        sock->rcbarg);
  771         }
  772 
  773         sock->read_timeout = (atomic_load(&sock->keepalive)
  774                           ? sock->mgr->keepalive
  775                           : sock->mgr->idle);
  776 
  777         if (sock->timer_initialized && sock->read_timeout != 0) {
  778             /* The timer will be updated */
  779             uv_timer_start(&sock->timer, readtimeout_cb,
  780                        sock->read_timeout, 0);
  781         }
  782 
  783         isc__nm_free_uvbuf(sock, buf);
  784         return;
  785     }
  786 
  787     isc__nm_free_uvbuf(sock, buf);
  788 
  789     /*
  790      * This might happen if the inner socket is closing.  It means that
  791      * it's detached, so the socket will be closed.
  792      */
  793     if (sock->rcb.recv != NULL) {
  794         isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
  795         sock->rcb.recv(sock->tcphandle, ISC_R_EOF, NULL, sock->rcbarg);
  796         isc__nmsocket_clearcb(sock);
  797     }
  798 
  799     /*
  800      * We don't need to clean up now; the socket will be closed and
  801      * resources and quota reclaimed when handle is freed in
  802      * isc__nm_tcp_close().
  803      */
  804 }
  805 
  806 static void
  807 quota_accept_cb(isc_quota_t *quota, void *sock0) {
  808     isc_nmsocket_t *sock = (isc_nmsocket_t *)sock0;
  809     isc__netievent_tcpaccept_t *ievent = NULL;
  810 
  811     REQUIRE(VALID_NMSOCK(sock));
  812 
  813     /*
  814      * Create a tcpaccept event and pass it using the async channel.
  815      */
  816     ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpaccept);
  817     ievent->sock = sock;
  818     ievent->quota = quota;
  819     isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
  820                    (isc__netievent_t *)ievent);
  821 }
  822 
  823 /*
  824  * This is called after we get a quota_accept_cb() callback.
  825  */
  826 void
  827 isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
  828     isc_result_t result;
  829     isc__netievent_tcpaccept_t *ievent = (isc__netievent_tcpaccept_t *)ev0;
  830 
  831     REQUIRE(worker->id == ievent->sock->tid);
  832 
  833     result = accept_connection(ievent->sock, ievent->quota);
  834     if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) {
  835         if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) ||
  836             can_log_tcp_quota()) {
  837             isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
  838                       ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
  839                       "TCP connection failed: %s",
  840                       isc_result_totext(result));
  841         }
  842     }
  843 
  844     /*
  845      * The socket was attached just before we called isc_quota_attach_cb().
  846      */
  847     isc__nmsocket_detach(&ievent->sock);
  848 }
  849 
  850 /*
  851  * Close callback for uv_tcp_t strutures created in accept_connection().
  852  */
  853 static void
  854 free_uvtcpt(uv_handle_t *uvs) {
  855     isc_mem_t *mctx = (isc_mem_t *)uv_handle_get_data(uvs);
  856     isc_mem_putanddetach(&mctx, uvs, sizeof(uv_tcp_t));
  857 }
  858 
  859 static isc_result_t
  860 accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
  861     isc_result_t result;
  862     isc__netievent_tcpchildaccept_t *event = NULL;
  863     isc__networker_t *worker = NULL;
  864     uv_tcp_t *uvstream = NULL;
  865     isc_mem_t *mctx = NULL;
  866     int r, w;
  867 
  868     REQUIRE(VALID_NMSOCK(ssock));
  869     REQUIRE(ssock->tid == isc_nm_tid());
  870 
  871     if (!atomic_load_relaxed(&ssock->active) ||
  872         atomic_load_relaxed(&ssock->mgr->closing))
  873     {
  874         /* We're closing, bail */
  875         if (quota != NULL) {
  876             isc_quota_detach(&quota);
  877         }
  878         return (ISC_R_CANCELED);
  879     }
  880 
  881     /* We can be called directly or as a callback from quota */
  882     if (ssock->pquota != NULL && quota == NULL) {
  883         /*
  884          * We need to attach to ssock, because it might be queued
  885          * waiting for a TCP quota slot.  If so, then we'll detach it
  886          * later when the connection is accepted. (XXX: This may be
  887          * suboptimal, it might be better not to attach unless
  888          * we need to - but we risk a race then.)
  889          */
  890         isc_nmsocket_t *tsock = NULL;
  891         isc__nmsocket_attach(ssock, &tsock);
  892         result = isc_quota_attach_cb(ssock->pquota, &quota,
  893                          &ssock->quotacb);
  894         if (result == ISC_R_QUOTA) {
  895             isc__nm_incstats(ssock->mgr,
  896                      ssock->statsindex[STATID_ACCEPTFAIL]);
  897             return (result);
  898         }
  899 
  900         /*
  901          * We're under quota, so there's no need to wait;
  902          * Detach the socket.
  903          */
  904         isc__nmsocket_detach(&tsock);
  905     }
  906 
  907     isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]);
  908 
  909     worker = &ssock->mgr->workers[isc_nm_tid()];
  910     uvstream = isc_mem_get(ssock->mgr->mctx, sizeof(uv_tcp_t));
  911 
  912     isc_mem_attach(ssock->mgr->mctx, &mctx);
  913     uv_handle_set_data((uv_handle_t *)uvstream, mctx);
  914     mctx = NULL; /* Detached later in free_uvtcpt() */
  915 
  916     uv_tcp_init(&worker->loop, uvstream);
  917 
  918     r = uv_accept(&ssock->uv_handle.stream, (uv_stream_t *)uvstream);
  919     if (r != 0) {
  920         result = isc__nm_uverr2result(r);
  921         uv_close((uv_handle_t *)uvstream, free_uvtcpt);
  922         isc_quota_detach(&quota);
  923         return (result);
  924     }
  925 
  926     /* We have an accepted TCP socket, pass it to a random worker */
  927     w = isc_random_uniform(ssock->mgr->nworkers);
  928     event = isc__nm_get_ievent(ssock->mgr, netievent_tcpchildaccept);
  929     event->sock = ssock;
  930     event->quota = quota;
  931 
  932     r = isc_uv_export((uv_stream_t *)uvstream, &event->streaminfo);
  933     RUNTIME_CHECK(r == 0);
  934 
  935     uv_close((uv_handle_t *)uvstream, free_uvtcpt);
  936 
  937     if (w == isc_nm_tid()) {
  938         isc__nm_async_tcpchildaccept(&ssock->mgr->workers[w],
  939                          (isc__netievent_t *)event);
  940         isc__nm_put_ievent(ssock->mgr, event);
  941     } else {
  942         isc__nm_enqueue_ievent(&ssock->mgr->workers[w],
  943                        (isc__netievent_t *)event);
  944     }
  945 
  946     return (ISC_R_SUCCESS);
  947 }
  948 
  949 isc_result_t
  950 isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
  951          void *cbarg) {
  952     isc_nmsocket_t *sock = handle->sock;
  953     isc__netievent_tcpsend_t *ievent = NULL;
  954     isc__nm_uvreq_t *uvreq = NULL;
  955 
  956     REQUIRE(sock->type == isc_nm_tcpsocket);
  957 
  958     uvreq = isc__nm_uvreq_get(sock->mgr, sock);
  959     uvreq->uvbuf.base = (char *)region->base;
  960     uvreq->uvbuf.len = region->length;
  961     uvreq->handle = handle;
  962     isc_nmhandle_ref(uvreq->handle);
  963     uvreq->cb.send = cb;
  964     uvreq->cbarg = cbarg;
  965 
  966     if (sock->tid == isc_nm_tid()) {
  967         /*
  968          * If we're in the same thread as the socket we can send the
  969          * data directly
  970          */
  971         return (tcp_send_direct(sock, uvreq));
  972     } else {
  973         /*
  974          * We need to create an event and pass it using async channel
  975          */
  976         ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpsend);
  977         ievent->sock = sock;
  978         ievent->req = uvreq;
  979         isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
  980                        (isc__netievent_t *)ievent);
  981         return (ISC_R_SUCCESS);
  982     }
  983 
  984     return (ISC_R_UNEXPECTED);
  985 }
  986 
  987 static void
  988 tcp_send_cb(uv_write_t *req, int status) {
  989     isc_result_t result = ISC_R_SUCCESS;
  990     isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
  991     isc_nmsocket_t *sock = NULL;
  992 
  993     REQUIRE(VALID_UVREQ(uvreq));
  994     REQUIRE(VALID_NMHANDLE(uvreq->handle));
  995 
  996     if (status < 0) {
  997         result = isc__nm_uverr2result(status);
  998         isc__nm_incstats(uvreq->sock->mgr,
  999                  uvreq->sock->statsindex[STATID_SENDFAIL]);
 1000     }
 1001 
 1002     uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
 1003 
 1004     sock = uvreq->handle->sock;
 1005     isc_nmhandle_unref(uvreq->handle);
 1006     isc__nm_uvreq_put(&uvreq, sock);
 1007 }
 1008 
 1009 /*
 1010  * Handle 'tcpsend' async event - send a packet on the socket
 1011  */
 1012 void
 1013 isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
 1014     isc_result_t result;
 1015     isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *)ev0;
 1016 
 1017     REQUIRE(worker->id == ievent->sock->tid);
 1018 
 1019     if (!atomic_load(&ievent->sock->active)) {
 1020         return;
 1021     }
 1022 
 1023     result = tcp_send_direct(ievent->sock, ievent->req);
 1024     if (result != ISC_R_SUCCESS) {
 1025         ievent->req->cb.send(ievent->req->handle, result,
 1026                      ievent->req->cbarg);
 1027         isc__nm_uvreq_put(&ievent->req, ievent->req->handle->sock);
 1028     }
 1029 }
 1030 
 1031 static isc_result_t
 1032 tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
 1033     int r;
 1034 
 1035     REQUIRE(sock->tid == isc_nm_tid());
 1036     REQUIRE(sock->type == isc_nm_tcpsocket);
 1037 
 1038     isc_nmhandle_ref(req->handle);
 1039     r = uv_write(&req->uv_req.write, &sock->uv_handle.stream, &req->uvbuf,
 1040              1, tcp_send_cb);
 1041     if (r < 0) {
 1042         isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
 1043         req->cb.send(NULL, isc__nm_uverr2result(r), req->cbarg);
 1044         isc__nm_uvreq_put(&req, sock);
 1045         return (isc__nm_uverr2result(r));
 1046     }
 1047 
 1048     return (ISC_R_SUCCESS);
 1049 }
 1050 
 1051 static void
 1052 tcp_close_cb(uv_handle_t *uvhandle) {
 1053     isc_nmsocket_t *sock = uv_handle_get_data(uvhandle);
 1054 
 1055     REQUIRE(VALID_NMSOCK(sock));
 1056 
 1057     isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
 1058     atomic_store(&sock->closed, true);
 1059     atomic_store(&sock->connected, false);
 1060     isc__nmsocket_prep_destroy(sock);
 1061 }
 1062 
 1063 static void
 1064 timer_close_cb(uv_handle_t *uvhandle) {
 1065     isc_nmsocket_t *sock = uv_handle_get_data(uvhandle);
 1066 
 1067     REQUIRE(VALID_NMSOCK(sock));
 1068 
 1069     if (sock->server != NULL) {
 1070         isc__nmsocket_detach(&sock->server);
 1071     }
 1072     uv_close(&sock->uv_handle.handle, tcp_close_cb);
 1073 }
 1074 
 1075 static void
 1076 tcp_close_direct(isc_nmsocket_t *sock) {
 1077     REQUIRE(VALID_NMSOCK(sock));
 1078     REQUIRE(sock->tid == isc_nm_tid());
 1079     REQUIRE(sock->type == isc_nm_tcpsocket);
 1080     if (sock->quota != NULL) {
 1081         isc_quota_detach(&sock->quota);
 1082     }
 1083     if (sock->timer_initialized) {
 1084         sock->timer_initialized = false;
 1085         uv_timer_stop(&sock->timer);
 1086         uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
 1087     } else {
 1088         if (sock->server != NULL) {
 1089             isc__nmsocket_detach(&sock->server);
 1090         }
 1091         uv_close(&sock->uv_handle.handle, tcp_close_cb);
 1092     }
 1093 }
 1094 
 1095 void
 1096 isc__nm_tcp_close(isc_nmsocket_t *sock) {
 1097     REQUIRE(VALID_NMSOCK(sock));
 1098     REQUIRE(sock->type == isc_nm_tcpsocket);
 1099 
 1100     if (sock->tid == isc_nm_tid()) {
 1101         tcp_close_direct(sock);
 1102     } else {
 1103         /*
 1104          * We need to create an event and pass it using async channel
 1105          */
 1106         isc__netievent_tcpclose_t *ievent =
 1107             isc__nm_get_ievent(sock->mgr, netievent_tcpclose);
 1108 
 1109         ievent->sock = sock;
 1110         isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
 1111                        (isc__netievent_t *)ievent);
 1112     }
 1113 }
 1114 
 1115 void
 1116 isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) {
 1117     isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *)ev0;
 1118 
 1119     REQUIRE(worker->id == ievent->sock->tid);
 1120 
 1121     tcp_close_direct(ievent->sock);
 1122 }
 1123 
 1124 void
 1125 isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
 1126     REQUIRE(VALID_NMSOCK(sock));
 1127 
 1128     if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
 1129         sock->rcb.recv != NULL)
 1130     {
 1131         sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
 1132                    sock->rcbarg);
 1133         isc__nmsocket_clearcb(sock);
 1134     }
 1135 }
 1136 
 1137 void
 1138 isc__nm_tcp_cancelread(isc_nmsocket_t *sock) {
 1139     REQUIRE(VALID_NMSOCK(sock));
 1140 
 1141     if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
 1142         sock->rcb.recv != NULL)
 1143     {
 1144         sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
 1145                    sock->rcbarg);
 1146         isc__nmsocket_clearcb(sock);
 1147     }
 1148 }